table

package
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2026 License: Apache-2.0 Imports: 19 Imported by: 7

README

About Chunkers

Chunkers are used to split a table into multiple chunks for copying. The downside of a chunk being too large is as follows:

  • Replicas fall behind the primary (Spirit doesn't support high fidelity read-replicas, but at a certain point it impacts DR).
  • Locks are held for the duration of a chunk copy, so a large chunk can block other operations on the table.

All chunkers support "dynamic chunking," which means that from a configuration perspective you specify the ideal chunk size in time-based units (e.g., 500ms) and the chunker will adjust the chunk size to meet that target. This tends to be a better approach than specifying a fixed chunk size, because the chunk size can vary wildly depending on the table. As the new table gets larger, we typically see the chunk size reduce significantly to compensate for larger insert times. We believe this is more likely to occur on Aurora than MySQL because on IO-bound workloads it does not have the change buffer.

Spirit should be aggressive in copying, but there should only be minimal elevation in p99 response times. If you consider that a table regularly has DML queries that take 1-5ms, then it is reasonable to assume a chunk time of 500ms will elevate some queries to 505ms. Assuming this contention is limited, it may only be observed by the pMax and not the p99. It is usually application-dependent how much of a latency hit is acceptable. Our belief is that 500ms is on the high end of acceptable for defaults, and users will typically lower it rather than increase it. We limit the maximum chunk time to 5s because it is unlikely that users can tolerate larger than a 5s latency hit for a single query on an OLTP system. Since we also adjust various lock wait timeouts based on the assumption that chunks are about this size, increasing beyond 5s would require additional tuning.

Chunking becomes a complicated problem because data can have an uneven distribution, and some tables have composite or unusual data types for PRIMARY KEYs. We have chosen to solve the chunking problem by not using a one-size-fits-all approach, but rather an interface that has two primary implementations: composite and optimistic.

Composite Chunker

The composite chunker is our newest chunker, and it is selected unless the table has an AUTO_INCREMENT single-column PRIMARY KEY.

Its implementation is very similar to how the chunker in gh-ost works:

  • A SELECT statement is performed to find the exact PRIMARY KEY value of the row that is chunkSize rows larger than the current chunk pointer:
SELECT pk FROM table WHERE pk > chunkPointer ORDER BY pk LIMIT 1 OFFSET {chunkSize}
  • An INSERT .. SELECT statement is run on the table to copy between the last chunk pointer and the new value.

The composite chunker is very good at dividing the chunks up equally, since barring a brief race condition each chunk will match exactly the chunkSize value. The main downside is that it becomes a little bit wasteful when you have AUTO_INCREMENT PRIMARY KEYs and rarely delete data. In this case, you waste the initial SELECT statement, since the client could easily calculate the next chunk pointer by adding chunkSize to the previous chunk pointer. A second issue is that the KeyAboveHighWatermark optimization is more complex for the composite chunker than for the optimistic chunker. It works correctly for numeric, binary, and temporal primary key types, but for VARCHAR/TEXT columns with collations, Go's byte-order comparison may differ from MySQL's collation order (e.g., 'aa' = 'AA' in utf8mb4_0900_ai_ci). Any discrepancies are caught by the checksum phase, since watermark optimizations are disabled before checksumming begins (see issue #479).

Many of our use cases have AUTO_INCREMENT PRIMARY KEYs, so despite the composite chunker also being able to support non-composite PRIMARY KEYs, we have no plans to switch to it entirely.

Optimistic Chunker

The optimistic chunker was our first chunker, and it's ideal for one of our main use cases: AUTO_INCREMENT PRIMARY KEYs.

Its basic implementation is as follows:

  • Find the min/max of the PRIMARY KEY column.
  • Have a special chunk retrieve values less than the min value (because the value is cached, it's possible a small number of rows exist at the beginning).
  • Advance by chunk size until you reach the max value.
  • Have a special chunk retrieve values greater than the max value (because the value is cached, it's possible a small number of rows exist at the end of the table).

To a certain extent, the chunk size will automatically adjust to small gaps in the table as dynamic chunking adjusts to compensate for slightly faster copies. However, this is intentionally limited with dynamic chunking having a hard limit on the chunk size of 100,000 rows. It can also only expand the chunk size by 50% at a time. This helps prevent the scenario where quickly processed chunks (likely caused by table gaps) expand the chunk size too quickly, causing future chunks to be too large and causing QoS issues.

To deal with large gaps, the optimistic chunker also supports a special "prefetching mode". Prefetching mode is enabled when the chunk size has already reached the 100,000 row limit, and each chunk is still only taking 20% of the target time for chunk copying. Prefetching was first developed when we discovered a user with approximately 20 million rows in the table but a large gap between the AUTO_INCREMENT value of 20 million and the end of the table (300 billion). You can think of prefetching mode as similar to how the composite chunker works, as it will perform a SELECT query to find the next PRIMARY KEY value it should use as a pointer. Prefetching is automatically disabled again if the chunk size is ever reduced below the 100,000 row limit.

MappedChunker Interface

The MappedChunker interface extends Chunker for chunkers that operate on a single source→target table pair. It adds:

  • ColumnMapping() — returns the ColumnMapping between source and target tables
  • KeyAboveHighWatermark() / KeyBelowLowWatermark() — watermark optimizations for binlog filtering

The optimistic and composite chunkers implement MappedChunker. The multi chunker does not, because it wraps multiple independent table pairs.

ColumnMapping

ColumnMapping describes the column relationship between a source and target table, including any column renames. It is created via NewColumnMapping(source, target, renames) where renames is an optional map[string]string of old→new column names.

It provides:

  • Columns() — returns the comma-separated source and target column lists for INSERT ... SELECT and REPLACE statements
  • ColumnsSlice() — returns the column lists as slices
  • ChecksumExprs() — returns expressions for CRC32-based checksum comparison, handling type casting and renamed columns
  • SourceTable() / TargetTable() — returns the source/target TableInfo
  • SourceColumnIndices() / SourceOrdinalIndices() — returns column index maps for binlog row processing

When no renames are specified, ColumnMapping produces identical output to the previous IntersectNonGeneratedColumns approach. With renames, it correctly maps old column names in the source to new column names in the target for all SQL generation.

Multi Chunker

The multi chunker is a wrapper that coordinates multiple chunkers for parallel table migrations. It is used when chunking multiple tables simultaneously (such as in an atomic schema change, or move operation).

Its implementation works as follows:

  • Wraps multiple child chunkers (typically one per table being migrated).
  • Distributes Next() calls to the chunker that has made the least progress by percentage.
  • Routes Feedback() calls to the appropriate child chunker based on the chunk's table.
  • Aggregates progress across all child chunkers.
  • Handles checkpointing by serializing watermarks from all child chunkers into a JSON map.

The multi chunker uses a progress-based scheduling algorithm to ensure balanced progress across all tables. When Next() is called, it selects the chunker with the lowest completion percentage. If multiple chunkers have the same percentage, it prioritizes the one with more total rows expected. This approach prevents one small table from completing while larger tables lag behind, ensuring more predictable overall migration times.

For checkpointing and recovery, the multi chunker serializes each child chunker's watermark into a JSON map keyed by table name. During recovery with OpenAtWatermark(), tables that have watermarks resume from their checkpoint, while tables without watermarks (which weren't ready when the checkpoint was saved) start from scratch using Open().

Documentation

Overview

Package table contains some common utilities for working with tables such as a 'Chunker' feature.

Index

Constants

View Source
const (
	// StartingChunkSize is the initial chunkSize
	StartingChunkSize = 1000
	// MaxDynamicStepFactor is the maximum amount each recalculation of the dynamic chunkSize can
	// increase by. For example, if the newTarget is 5000 but the current target is 1000, the newTarget
	// will be capped back down to 1500. Over time the number 5000 will be reached, but not straight away.
	MaxDynamicStepFactor = 1.5
	// MinDynamicRowSize is the minimum chunkSize that can be used when dynamic chunkSize is enabled.
	// This helps prevent a scenario where the chunk size is too small (it can never be less than 1).
	MinDynamicRowSize = 10
	// MaxDynamicRowSize is the max allowed chunkSize that can be used when dynamic chunkSize is enabled.
	// This seems like a safe upper bound for now
	MaxDynamicRowSize = 100000
	// DynamicPanicFactor is the factor by which the feedback process takes immediate action when
	// the chunkSize appears to be too large. For example, if the PanicFactor is 5, and the target *time*
	// is 50ms, an actual time 250ms+ will cause the dynamic chunk size to immediately be reduced.
	DynamicPanicFactor = 5

	// ChunkerDefaultTarget is the default chunker target
	ChunkerDefaultTarget = 100 * time.Millisecond
)

Variables

View Source
var (
	ErrTableIsRead        = errors.New("table is read")
	ErrTableNotOpen       = errors.New("please call Open() first")
	ErrUnsupportedPKType  = errors.New("unsupported primary key type")
	ErrWatermarkNotReady  = errors.New("watermark not yet ready")
	ErrChunkerNotOpen     = errors.New("chunker is not open, call Open() first")
	ErrChunkerAlreadyOpen = errors.New("table is already open, did you mean to call Reset()?")
)

Functions

func IsArchiveTable added in v0.11.3

func IsArchiveTable(name string) bool

IsArchiveTable returns true if the table name matches the archive naming convention: <name>_archive_YYYY, <name>_archive_YYYY_MM, or <name>_archive_YYYY_MM_DD.

func LazyFindP90

func LazyFindP90(a []time.Duration) time.Duration

LazyFindP90 finds the second to last value in a slice. This is the same as a p90 if there are 10 values, but if there were 100 values it would technically be a p99 etc.

func QuoteColumns

func QuoteColumns(cols []string) string

func StripAutoIncrement added in v0.11.3

func StripAutoIncrement(stmt string) string

StripAutoIncrement removes the AUTO_INCREMENT=N table option from a CREATE TABLE statement. This is useful when comparing schemas to avoid spurious diffs caused by differing auto-increment counters.

func WatermarkAboveClause added in v0.13.0

func WatermarkAboveClause(ti *TableInfo, watermarkJSON string) (string, error)

WatermarkAboveClause parses a watermark JSON string (as produced by GetLowWatermark/checkpoint) and returns a SQL WHERE clause that matches rows strictly above the watermark's upper bound. This is used by the move path to delete rows above the watermark from target tables before resuming, so that the keyAboveWatermark optimization is safe without needing to read the target table's max value.

For example, if the watermark upper bound is id=100, this returns something like "`id` > 100".

Types

type Boundary

type Boundary struct {
	Value     []Datum
	Inclusive bool
}

Boundary is used by chunk for lower or upper boundary

func (*Boundary) JSON

func (b *Boundary) JSON() string

JSON encodes a boundary as JSON. The values are represented as strings, to avoid JSON float behavior. See Issue #125

type Chunk

type Chunk struct {
	Key                  []string
	ChunkSize            uint64
	LowerBound           *Boundary
	UpperBound           *Boundary
	AdditionalConditions string
	Table                *TableInfo     // Source table information for this chunk
	NewTable             *TableInfo     // Destination table information for this chunk
	ColumnMapping        *ColumnMapping // Column relationship between source and target, including renames
}

Chunk is returned by chunk.Next() Applications can use it to iterate over the rows.

func (*Chunk) JSON

func (c *Chunk) JSON() string

func (*Chunk) String

func (c *Chunk) String() string

String strigifies a chunk into a fragment what can be used in a WHERE clause. i.e. pk > 100 and pk < 200

type Chunker

type Chunker interface {
	Open() error
	IsRead() bool
	Close() error
	Next() (*Chunk, error)
	Feedback(chunk *Chunk, duration time.Duration, actualRows uint64)
	Progress() (rowsRead uint64, chunksCopied uint64, totalRowsExpected uint64)
	OpenAtWatermark(watermark string) error
	GetLowWatermark() (watermark string, err error)
	// Reset resets the chunker to start from the beginning, as if Open() was just called.
	// This is used when retrying operations like checksums.
	Reset() error
	// Tables return a list of table names
	// By convention the first table is the "current" table,
	// and the second table (if any) is the "new" table.
	// There could be more than 2 tables in the case of multi-chunker.
	// In which case every second table is the "new" table, etc.
	Tables() []*TableInfo
}

func NewMultiChunker

func NewMultiChunker(c ...Chunker) Chunker

NewMultiChunker creates a new multi-chunker that wraps multiple chunkers

type ChunkerConfig added in v0.13.0

type ChunkerConfig struct {
	// NewTable is the destination table. If nil, defaults to the source table
	// (for move operations where there is no distinct new table).
	NewTable *TableInfo
	// TargetChunkTime is the target duration for each chunk. Defaults to ChunkerDefaultTarget.
	TargetChunkTime time.Duration
	// Logger is the structured logger. Defaults to slog.Default().
	Logger *slog.Logger
	// ColumnMapping describes the column relationship between source and target tables,
	// including any renames. If nil, a default mapping with no renames is created.
	ColumnMapping *ColumnMapping
	// Key and Where are used for composite chunkers to specify a non-primary key index.
	// When Key is set, the composite chunker is always used regardless of whether the
	// table has an auto-increment primary key.
	Key   string
	Where string
}

ChunkerConfig holds optional configuration for creating a Chunker. Only the source table (passed as the first argument to NewChunker) is required; all other fields have sensible defaults.

type ColumnMapping added in v0.13.0

type ColumnMapping struct {
	// contains filtered or unexported fields
}

ColumnMapping represents the column relationship between a source and target table, including any column renames. It is computed once and shared across chunker, copier, applier, checksum, and repl subscription.

A nil *ColumnMapping is safe to use — all methods return sensible defaults (empty columns, nil renames).

func NewColumnMapping added in v0.13.0

func NewColumnMapping(source, target *TableInfo, renames map[string]string) *ColumnMapping

NewColumnMapping creates a ColumnMapping between source and target tables, with an optional rename map (old→new). The column intersection is computed immediately. If target is nil, source is used as the target.

func (*ColumnMapping) ChecksumExprs added in v0.13.0

func (m *ColumnMapping) ChecksumExprs() (source, target string, err error)

ChecksumExprs returns two comma-separated checksum column expressions for source and target, wrapping each column in IFNULL(), ISNULL() and CAST. The CAST type always comes from the target table's type definition. When there are no renames, both expressions are identical.

func (*ColumnMapping) Columns added in v0.13.0

func (m *ColumnMapping) Columns() (source, target string)

Columns returns two comma-separated, backtick-quoted column lists for source and target. When there are no renames, both strings are identical.

func (*ColumnMapping) ColumnsSlice added in v0.13.0

func (m *ColumnMapping) ColumnsSlice() (sourceColumns, targetColumns []string)

ColumnsSlice returns parallel slices of source and target column names. sourceColumns[i] corresponds to targetColumns[i].

func (*ColumnMapping) Renames added in v0.13.0

func (m *ColumnMapping) Renames() map[string]string

Renames returns the column rename mapping (old→new), or nil if there are none.

func (*ColumnMapping) SourceColumnIndices added in v0.13.0

func (m *ColumnMapping) SourceColumnIndices() []int

SourceColumnIndices returns the indices into sourceTable.NonGeneratedColumns for each intersected column. This is used when row data only contains non-generated columns (e.g., from SELECT statements).

func (*ColumnMapping) SourceOrdinalIndices added in v0.13.0

func (m *ColumnMapping) SourceOrdinalIndices() []int

SourceOrdinalIndices returns the indices into sourceTable.Columns (all columns, including generated) for each intersected column. This is needed when working with binlog row images, which contain ALL columns including generated ones.

func (*ColumnMapping) SourceTable added in v0.13.0

func (m *ColumnMapping) SourceTable() *TableInfo

SourceTable returns the source table.

func (*ColumnMapping) TargetTable added in v0.13.0

func (m *ColumnMapping) TargetTable() *TableInfo

TargetTable returns the target table.

type Datum

type Datum struct {
	Val any
	Tp  datumTp // signed, unsigned, binary
	// contains filtered or unexported fields
}

Datum could be a binary string, uint64 or int64.

func NewDatum

func NewDatum(val any, tp datumTp) (Datum, error)

func NewDatumFromValue added in v0.10.1

func NewDatumFromValue(value any, mysqlType string) (Datum, error)

NewDatumFromValue creates a Datum from a value and MySQL column type. This is useful for converting values from the database driver (which may be []byte, int, string, etc.) into a Datum that can be formatted as SQL.

func NewNilDatum

func NewNilDatum(tp datumTp) Datum

func (Datum) Add

func (d Datum) Add(addVal uint64) (Datum, error)

Add returns d + addVal. Returns an error if d is not numeric — callers that previously crashed on a binary-PK migration via the optimistic chunker's prefetch path now get a recoverable error and can checkpoint and exit cleanly.

func (Datum) GreaterThan

func (d Datum) GreaterThan(d2 Datum) (bool, error)

func (Datum) GreaterThanOrEqual

func (d Datum) GreaterThanOrEqual(d2 Datum) (bool, error)

func (Datum) IsBinaryString

func (d Datum) IsBinaryString() bool

func (Datum) IsNil

func (d Datum) IsNil() bool

func (Datum) IsNumeric

func (d Datum) IsNumeric() bool

IsNumeric checks if it's signed or unsigned

func (Datum) LessThan added in v0.11.0

func (d Datum) LessThan(d2 Datum) (bool, error)

func (Datum) LessThanOrEqual added in v0.11.0

func (d Datum) LessThanOrEqual(d2 Datum) (bool, error)

func (Datum) MaxValue

func (d Datum) MaxValue() Datum

func (Datum) MinValue

func (d Datum) MinValue() Datum

func (Datum) Range

func (d Datum) Range(d2 Datum) (uint64, error)

Range returns the diff between two datums as a uint64. Returns an error on non-numeric types for the same reason Add does.

func (Datum) String

func (d Datum) String() string

String returns the datum as a complete, self-contained SQL literal. Every return path is safe to inline directly into a SQL statement without further quoting or escaping by the caller:

  • NULL for IsNil()
  • the numeric literal (e.g. 42) for IsNumeric()
  • 0x... hex literal for IsBinaryString()
  • "..." with backslash escapes for everything else

The string-literal path runs sqlescape.EscapeString on the contents and wraps in double quotes, so callers like Chunk.String / expandRowConstructorComparison / applier UpsertRows can construct SQL by simple fmt.Sprintf concatenation. New code that has explicit error handling available may prefer a typed accessor, but the pre-escaped contract here is load-bearing for the migration's SQL emission paths.

It is also fmt.Stringer for log / debug output. The previous form panicked when a non-numeric datum's Val was not a string; this form coerces via %v so a misconstructed datum still produces a valid SQL fragment rather than crashing the migration. NewDatum always normalizes Val to string for binaryType/unknownType, so this coercion only fires for datums built by hand with an unexpected Val type.

type FeedbackCall

type FeedbackCall struct {
	Chunk      *Chunk
	Duration   time.Duration
	ActualRows uint64
	Timestamp  time.Time
}

type FilterOption added in v0.11.3

type FilterOption int

FilterOption controls which tables are excluded and which DDL transformations are applied when loading schema from a database.

const (
	// WithoutUnderscoreTables filters out tables whose name begins with "_".
	// This is commonly used to exclude Spirit's internal shadow/checkpoint tables
	// and other tool-generated temporary tables.
	WithoutUnderscoreTables FilterOption = iota + 1

	// WithoutArchiveTables filters out tables matching the archive naming convention:
	// <name>_archive_YYYY, <name>_archive_YYYY_MM, or <name>_archive_YYYY_MM_DD.
	WithoutArchiveTables

	// WithStrippedAutoIncrement removes the AUTO_INCREMENT=N table option from
	// CREATE TABLE statements. This is useful when comparing schemas to avoid
	// spurious diffs caused by differing auto-increment counters.
	WithStrippedAutoIncrement
)

type HashFunc added in v0.10.1

type HashFunc func(value any) (uint64, error)

HashFunc is a hash function that takes a single column value and returns a uint64 hash. This matches Vitess vindex behavior where the hash is used to determine shard placement. The hash value is then matched against key ranges to find the target shard.

type JSONBoundary

type JSONBoundary struct {
	Value     []string
	Inclusive bool
}

type JSONChunk

type JSONChunk struct {
	Key        []string
	ChunkSize  uint64
	LowerBound JSONBoundary
	UpperBound JSONBoundary
}

type MappedChunker added in v0.13.0

type MappedChunker interface {
	Chunker
	// ColumnMapping returns the column mapping between source and target tables,
	// including any column renames.
	ColumnMapping() *ColumnMapping
	KeyAboveHighWatermark(key0 any) bool
	KeyBelowLowWatermark(key0 any) bool
}

MappedChunker is a Chunker that operates on a single source→target table pair and carries a ColumnMapping describing the column relationship between them. The multiChunker does not implement this interface because it wraps multiple independent table pairs, each with their own mapping.

func NewChunker

func NewChunker(t *TableInfo, config ChunkerConfig) (MappedChunker, error)

NewChunker creates a new MappedChunker for the given source table. It selects the optimistic chunker for single-column auto-increment primary keys (unless Key/Where overrides are specified), and the composite chunker otherwise.

type MockChunker

type MockChunker struct {
	// contains filtered or unexported fields
}

MockChunker provides a controllable chunker for testing multi-chunker behavior

func NewMockChunker

func NewMockChunker(tableName string, totalRows uint64) *MockChunker

NewMockChunker creates a new mock chunker for testing

func (*MockChunker) Close

func (m *MockChunker) Close() error

func (*MockChunker) ColumnMapping added in v0.13.0

func (m *MockChunker) ColumnMapping() *ColumnMapping

func (*MockChunker) Feedback

func (m *MockChunker) Feedback(chunk *Chunk, duration time.Duration, actualRows uint64)

func (*MockChunker) GetCallCounts

func (m *MockChunker) GetCallCounts() (next, progress int)

func (*MockChunker) GetFeedbackCalls

func (m *MockChunker) GetFeedbackCalls() []FeedbackCall

func (*MockChunker) GetLowWatermark

func (m *MockChunker) GetLowWatermark() (string, error)

func (*MockChunker) IsRead

func (m *MockChunker) IsRead() bool

func (*MockChunker) KeyAboveHighWatermark

func (m *MockChunker) KeyAboveHighWatermark(key any) bool

KeyAboveHighWatermark returns true if the given key is above the current watermark It returns FALSE in cases that are difficult to determine (e.g. non-numeric keys)

func (*MockChunker) KeyBelowLowWatermark

func (m *MockChunker) KeyBelowLowWatermark(key any) bool

KeyBelowLowWatermark returns true if the given key is below the current low watermark It returns TRUE in cases that are difficult to determine (e.g. non-numeric keys)

func (*MockChunker) MarkAsComplete

func (m *MockChunker) MarkAsComplete()

func (*MockChunker) Next

func (m *MockChunker) Next() (*Chunk, error)

func (*MockChunker) Open

func (m *MockChunker) Open() error

Chunker interface implementation

func (*MockChunker) OpenAtWatermark

func (m *MockChunker) OpenAtWatermark(watermark string) error

func (*MockChunker) Progress

func (m *MockChunker) Progress() (uint64, uint64, uint64)

func (*MockChunker) Reset

func (m *MockChunker) Reset() error

func (*MockChunker) SetChunkSize

func (m *MockChunker) SetChunkSize(size uint64)

func (*MockChunker) SetCloseError

func (m *MockChunker) SetCloseError(err error)

func (*MockChunker) SetColumnMapping added in v0.13.0

func (m *MockChunker) SetColumnMapping(mapping *ColumnMapping)

func (*MockChunker) SetNextError

func (m *MockChunker) SetNextError(err error)

func (*MockChunker) SetOpenError

func (m *MockChunker) SetOpenError(err error)

Configuration methods

func (*MockChunker) SetWatermarkError

func (m *MockChunker) SetWatermarkError(err error)

func (*MockChunker) SimulateProgress

func (m *MockChunker) SimulateProgress(percentage float64)

Test helper methods

func (*MockChunker) Tables

func (m *MockChunker) Tables() []*TableInfo

type Operator

type Operator string

Operator is used to compare values in a WHERE clause.

const (
	OpGreaterThan  Operator = ">"
	OpGreaterEqual Operator = ">="
	OpLessThan     Operator = "<"
	OpLessEqual    Operator = "<="
)

type ShardingMetadataProvider added in v0.10.1

type ShardingMetadataProvider interface {
	// GetShardingMetadata returns the sharding column name and hash function for a table.
	// Returns empty string and nil function if the table doesn't have sharding configuration.
	//
	// Parameters:
	//   - schemaName: The database/schema name
	//   - tableName: The table name
	//
	// Returns:
	//   - shardingColumn: The column name to use for sharding (e.g., "user_id")
	//   - hashFunc: The hash function to apply to the column value
	//   - error: Any error encountered while retrieving metadata
	GetShardingMetadata(schemaName, tableName string) (shardingColumn string, hashFunc HashFunc, err error)
}

ShardingMetadataProvider is an interface that provides sharding configuration for tables. This allows external systems (like Vitess) to provide sharding metadata without Spirit having direct dependencies on those systems.

The provider is called during table discovery to optionally configure sharding information for each table. If a table doesn't require sharding configuration (e.g., in simple MoveTables operations), the provider can return empty values.

Example implementation for Vitess:

type VitessShardingProvider struct {
    vschema *vschemapb.Keyspace
}

func (v *VitessShardingProvider) GetShardingMetadata(schemaName, tableName string) (string, HashFunc, error) {
    tableVindex := v.vschema.Tables[tableName]
    if tableVindex == nil {
        return "", nil, nil // No sharding for this table
    }
    primaryVindex := tableVindex.ColumnVindexes[0]
    shardingColumn := primaryVindex.Column
    hashFunc := func(value any) (uint64, error) {
        return vitessVindexHash(primaryVindex.Name, value)
    }
    return shardingColumn, hashFunc, nil
}

type TableInfo

type TableInfo struct {
	sync.Mutex

	EstimatedRows       uint64 // used by the composite chunker for Max
	SchemaName          string
	TableName           string
	QuotedTableName     string   // `table` - backtick-quoted table name without schema
	Columns             []string // all the column names
	NonGeneratedColumns []string // all the non-generated column names
	Indexes             []string // all the index names

	KeyColumns []string // the column names of the primaryKey

	KeyIsAutoInc bool // if pk[0] is an auto_increment column

	DisableAutoUpdateStatistics atomic.Bool

	// Host is an optional identifier for the MySQL server this table belongs to.
	// It is used by MultiChunker to disambiguate tables with the same SchemaName
	// and TableName on different servers (e.g., in N:M move operations).
	// When empty, the multi-chunker keys by SchemaName.TableName only.
	Host string

	// Sharding configuration (for ShardedApplier)
	// These are set per-table when using multi-table migrations with different sharding keys
	ShardingColumn string   // Column name to extract and hash (e.g., "user_id")
	HashFunc       HashFunc // Hash function: value -> uint64
	// contains filtered or unexported fields
}

func NewTableInfo

func NewTableInfo(db *sql.DB, schema, table string) *TableInfo

func (*TableInfo) AutoUpdateStatistics

func (t *TableInfo) AutoUpdateStatistics(ctx context.Context, interval time.Duration, logger *slog.Logger)

AutoUpdateStatistics runs a loop that updates the table statistics every interval. This will continue until Close() is called on the tableInfo, or t.DisableAutoUpdateStatistics is set to true.

func (*TableInfo) Close

func (t *TableInfo) Close() error

Close currently does nothing

func (*TableInfo) DB added in v0.13.0

func (t *TableInfo) DB() *sql.DB

DB returns the database connection associated with this table. This is used by components like the copier and checksum that need to read from the correct source database when multiple sources are in use.

func (*TableInfo) DecodeBinlogRow added in v0.14.0

func (t *TableInfo) DecodeBinlogRow(row []any) error

DecodeBinlogRow converts ENUM and SET values in a binlog row image from their integer wire format (ENUM ordinal / SET bitmask) back to the string form. Mutates the slice in place.

Why: the go-mysql binlog reader yields ENUM as int64 ordinals and SET as int64 bitmasks. Spirit's buffered replay path takes the row image verbatim and feeds it to the applier as a REPLACE INTO ... VALUES. If the target column has been migrated to a non-ENUM type (e.g. VARCHAR), MySQL inserts those integers as literal values instead of the original strings, corrupting data. Decoding here restores the string form before the applier sees it.

nil values (NULL columns) and rows with no ENUM/SET columns are a no-op. If the table has no ENUM/SET columns at all, callers should gate with HasEnumOrSetColumns and skip the call entirely.

func (*TableInfo) DescIndex

func (t *TableInfo) DescIndex(keyName string) ([]string, error)

DescIndex describes the columns in an index.

func (*TableInfo) GetColumnMySQLType

func (t *TableInfo) GetColumnMySQLType(col string) (string, bool)

GetColumnMySQLType returns the MySQL type for a given column name

func (*TableInfo) GetColumnOrdinal added in v0.10.1

func (t *TableInfo) GetColumnOrdinal(columnName string) (int, error)

GetColumnOrdinal returns the ordinal position (0-indexed) of a column by name. This is useful for extracting values from row slices where the position matters. Returns an error if the column is not found.

func (*TableInfo) GetNonGeneratedColumnOrdinal added in v0.10.2

func (t *TableInfo) GetNonGeneratedColumnOrdinal(columnName string) (int, error)

GetNonGeneratedColumnOrdinal returns the ordinal position (0-indexed) of a column by name within the NonGeneratedColumns slice. This is useful when working with row data that only contains non-generated columns (e.g., from SELECT statements that exclude generated columns). Returns an error if the column is not found or if it's a generated column.

func (*TableInfo) HasEnumOrSetColumns added in v0.14.0

func (t *TableInfo) HasEnumOrSetColumns() bool

HasEnumOrSetColumns reports whether any column on this table is an ENUM or SET. Used to skip the per-row decoding hot path when there's nothing to decode.

func (*TableInfo) MaxValue

func (t *TableInfo) MaxValue() Datum

MaxValue as a datum

func (*TableInfo) PrimaryKeyIsMemoryComparable

func (t *TableInfo) PrimaryKeyIsMemoryComparable() error

PrimaryKeyIsMemoryComparable checks that the PRIMARY KEY type is compatible. We no longer need this check for the chunker, since it can handle any type of key in the composite chunker. But the migration still needs to verify this, because of the delta map feature, which requires binary comparable keys.

func (*TableInfo) PrimaryKeyValues

func (t *TableInfo) PrimaryKeyValues(row any) ([]any, error)

PrimaryKeyValues helps extract the PRIMARY KEY from a row image. It uses our knowledge of the ordinal position of columns to find the position of primary key columns (there might be more than one). For minimal row image, you need to send the before image to extract the PK. This is because in the after image, the PK might be nil.

func (*TableInfo) QualifiedName added in v0.13.0

func (t *TableInfo) QualifiedName() string

QualifiedName returns a stable key for this table suitable for use in checkpoint watermarks. The format is "host.schema.table" when Host is set, or "schema.table" otherwise. This ensures uniqueness even when multiple servers have identically-named schemas and tables (N:M moves).

func (*TableInfo) SetInfo

func (t *TableInfo) SetInfo(ctx context.Context) error

SetInfo reads from MySQL metadata (usually infoschema) and sets the values in TableInfo.

type TableProgress added in v0.11.0

type TableProgress struct {
	TableName  string
	RowsCopied uint64
	RowsTotal  uint64
	IsComplete bool
}

TableProgress contains progress information for a single table

type TableSchema added in v0.11.1

type TableSchema struct {
	Name   string // Table name
	Schema string // CREATE TABLE DDL statement
}

TableSchema represents a table's name and its raw CREATE TABLE DDL statement. This is the common representation used across spirit, strata, and gap for passing schema information between components.

func LoadSchemaFromDB added in v0.11.1

func LoadSchemaFromDB(ctx context.Context, db *sql.DB, opts ...FilterOption) ([]TableSchema, error)

LoadSchemaFromDB retrieves all table schemas from the database using the provided connection. The returned tables and DDL are filtered according to the supplied options. With no options the raw DDL is returned unmodified.

Directories

Path Synopsis
Package asserty offers functionality to assert for certain DB properties.
Package asserty offers functionality to assert for certain DB properties.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL