checksum

package
v0.15.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

README

Checksum

Checksums validate data consistency between two tables. During schema changes, this means comparing the original table with its _new counterpart. For move operations, checksums verify consistency between source and destination tables.

Key Features

  • Column mapping: The checksum uses ColumnMapping to determine which columns to compare between source and target tables. This handles the intersection of non-generated columns, column renames, and type casting automatically.
  • Type normalization: A CAST operation converts columns to a comparable type before comparison. This enables comparisons when data types have changed and their string representations differ (e.g., TIMESTAMP vs. TIMESTAMP(6)).
  • Automatic repair: When inconsistencies are detected, the checksum automatically repairs differences by recopying affected chunks.
  • Parallel execution: Checksums process chunks concurrently across multiple threads for efficient handling of large tables.
  • Consistent snapshot: A brief table lock establishes a consistent snapshot before being released. The checksum remains immune to concurrent modifications during execution.
  • Server-side execution: The checksum computation is pushed down to MySQL, with each chunk returning only a CRC32 value and row count to Spirit. This minimizes network overhead and is significantly more efficient than approaches that extract all data for client-side comparison.

Why Checksums Matter

Checksums are a defensive feature against bugs. While Spirit is designed to correctly copy and apply data changes, subtle data corruption can occur during online operations in many ways.

Naive implementations that only compare row counts fail to catch most of these problems—validating the actual data is essential. Common issues include:

  • Trailing space handling: Storage engines and column types may handle trailing spaces inconsistently
  • Special character mangling: Character encoding issues can corrupt special characters during copy operations
  • Character set mishandling: Converting between character sets (e.g., latin1utf8mb4) can introduce subtle corruption
  • Timezone conversions: Timestamp values may be incorrectly converted between timezones
  • Lost updates: Race conditions or replication lag can cause updates to be missed during the copy process
  • Type conversion edge cases: Implicit type conversions may produce unexpected results (e.g., floating point precision)
  • NULL mangling: NULLs can be incorrectly replaced by empty strings during data operations

While we do our best to prevent such bugs, we also want to be pedantic when it comes to data integrity. In most cases we have observed that the checksum process takes about 10% of the time as the copy-rows stage, which makes it an easy cost to justify.

There are also some known cases where a checksum failure is not a bug. This includes adding a unique index on non-unique data, or a lossy data type conversion (e.g., VARCHAR(100)VARCHAR(10) when records exist requiring more than 10 characters). Both are important cases to handle, and prevent a cutover operation from executing.

Implementations

The checksum package contains two implementations:

  1. SingleChecker - Compares two tables on the same MySQL server (for schema changes, or 1:1 moves)
  2. DistributedChecker - Compares a source table against multiple distributed target databases (for sharded scenarios)

Both implementations use the same underlying checksum algorithm: CRC32 with XOR aggregation. This technique computes a checksum for each chunk of rows and can efficiently detect differences without comparing individual rows.

Checksum Algorithm

The checksum is computed using (simplified version):

SELECT BIT_XOR(CRC32(CONCAT(...))) as checksum, COUNT(*) as c 
FROM table 
WHERE <chunk_range>

This approach:

  • Computes a CRC32 hash for each row (using concatenated column values)
  • Aggregates the row checksums using XOR (BIT_XOR)
  • Provides both a checksum value and row count for verification

The actual implementation includes additional handling:

  • NULL normalization: Uses IFNULL() and ISNULL() to ensure NULLs are consistently represented
  • Type casting: Applies CAST operations to convert columns to the target table's type for comparable string representations

The CRC32 + XOR aggregate technique for table checksumming was pioneered by pt-table-checksum from Percona Toolkit, which established this as a reliable method for verifying data consistency in MySQL. This same approach has since been adopted by other database tools, including TiDB's data migration and verification utilities, demonstrating its effectiveness for distributed database scenarios.

Documentation

Overview

Package checksum provides online checksum functionality. Two tables on the same MySQL server can be compared with only an initial lock. It is not in the row/ package because it requires a replClient to be passed in, which would cause a circular dependency.

Package checksum provides online checksum functionality. Two tables on the same MySQL server can be compared with only an initial lock. It is not in the row/ package because it requires a replClient to be passed in, which would cause a circular dependency.

Index

Constants

View Source
const (
	DefaultContinuousConcurrency     = 4
	DefaultContinuousRetryDelay      = time.Minute
	DefaultContinuousMaxQueueSize    = 1024
	DefaultContinuousTargetChunkTime = 1 * time.Second
)

Default values applied by NewContinuousChecker for zero-valued config fields. Exported so callers can reference them when tuning.

Variables

View Source
var (

	// ErrYieldTimeout is returned by runChecksum when the yield timeout expires.
	// This is distinct from the parent context being canceled, and signals that
	// the checksum should resume from the current watermark after releasing
	// long-running transactions to reduce HLL (history list length) growth.
	ErrYieldTimeout = errors.New("checksum yield timeout")

	// DefaultYieldTimeout is the default maximum duration for a single checksum
	// pass before yielding to release long-running REPEATABLE READ transactions.
	DefaultYieldTimeout = 24 * time.Hour
)
View Source
var ErrPermanentDivergence = errors.New("checksum: permanent divergence detected")

ErrPermanentDivergence is returned by Run when a chunk fails twice in a row with the source CRC unchanged AND no Recopier is configured — i.e. the target has data the source does not, the source is not racing, and the checker has no way to self-heal. With a Recopier configured this error is never returned: stable divergence triggers a Recopy and the chunk is counted in the per-pass "recopies" bucket.

This can technically false-positive if replication lag exceeds the retry delay — there may be changes that are still pending but we've not observed them yet. The retry delay defaults to 1 minute for that reason.

Functions

This section is empty.

Types

type Checker

type Checker interface {
	// Run performs the checksum operation.
	Run(ctx context.Context) error
	GetProgress() string
	StartTime() time.Time
	ExecTime() time.Duration
	// DifferencesFound returns the number of chunks where a source/target
	// mismatch was detected during the most recent (or in-flight) pass.
	// Useful for callers that need to distinguish "clean cancellation" from
	// "cancellation while a fix may have been mid-flight" — the continuous-
	// checksum loop uses it to decide whether a sentinel-drop swallow is
	// safe.
	DifferencesFound() uint64
}

func NewChecker

func NewChecker(sourceDBs []*sql.DB, chunker table.Chunker, feeds []change.Source, config *CheckerConfig) (Checker, error)

NewChecker creates a new checksum object. sourceDBs contains the source database connections (one for single-source migrations, multiple for N:M moves). The distributed checker aggregates checksums across all sources. The single checker uses sourceDBs[0].

type CheckerConfig

type CheckerConfig struct {
	Concurrency     int
	TargetChunkTime time.Duration
	DBConfig        *dbconn.DBConfig
	Logger          *slog.Logger
	FixDifferences  bool
	Watermark       string // optional; defines a watermark to start from
	MaxRetries      int
	Applier         applier.Applier // optional; indicates it is a distributed checker
	YieldTimeout    time.Duration   // maximum duration for a single checksum pass before yielding to release long-running transactions
}

func NewCheckerDefaultConfig

func NewCheckerDefaultConfig() *CheckerConfig

type ContinuousChecker added in v0.15.0

type ContinuousChecker struct {
	// contains filtered or unexported fields
}

ContinuousChecker is the eventually-consistent checker. Construct via NewContinuousChecker; use Run to drive it until ctx is cancelled or a permanent failure surfaces. Concurrent calls to Stats and FirstCleanPass are safe at any time.

func NewContinuousChecker added in v0.15.0

func NewContinuousChecker(
	sourceDB, targetDB *sql.DB,
	chunker table.Chunker,
	feed change.Source,
	cfg ContinuousCheckerConfig,
) (*ContinuousChecker, error)

NewContinuousChecker constructs a checker with the given dependencies and config. sourceDB and targetDB must be distinct connections to the source and target databases respectively. chunker must be Open before Run; the checker Resets it between passes but does not close it.

func (*ContinuousChecker) FirstCleanPass added in v0.15.0

func (c *ContinuousChecker) FirstCleanPass() <-chan struct{}

FirstCleanPass returns a channel that is closed the first time a pass completes with every chunk READ-verified equal and zero recopies. A pass containing a recopy does not qualify: the repaired rows were never observed equal, so the signal waits for a follow-up pass that re-reads them (and everything else) with no repairs needed. The signal is monotonic: once closed it stays closed. Callers that need a "data is known consistent" gate should select on this channel. Safe to call concurrently with Run.

func (*ContinuousChecker) Run added in v0.15.0

func (c *ContinuousChecker) Run(ctx context.Context) error

Run drives the checker until ctx is cancelled or a permanent failure is detected. On ctx cancellation Run returns ctx.Err() (typically context.Canceled or context.DeadlineExceeded); callers that want to treat a clean shutdown as nil should filter that themselves (see how datasync.Runner.runContinuous does it). A permanent failure — a chunk that mismatched twice in a row with the source CRC unchanged and no Recopier was configured — returns ErrPermanentDivergence. Errors from the chunker walker (chunker.Next failures) are wrapped and returned.

MaxQueueSize is a soft backpressure threshold rather than a hard cap: when the retry queue reaches it, the dispatcher stops reading fresh chunks from the walker until existing retries drain enough to make room. The walker blocks on its send; workers continue draining. WalkerStalls in the stats snapshot counts how often this has fired.

func (*ContinuousChecker) Stats added in v0.15.0

Stats returns a point-in-time snapshot of the checker's counters. Safe to call concurrently with Run.

type ContinuousCheckerConfig added in v0.15.0

type ContinuousCheckerConfig struct {
	// Concurrency is the number of worker goroutines. Default 4.
	Concurrency int

	// RetryDelay is the minimum wait between attempts for any given chunk —
	// measured from the *last* attempt of that chunk, not from the original
	// failure. Default 1m, because changes are queued in the replication
	// applier for 30s by default.
	RetryDelay time.Duration

	// MaxQueueSize is the cap on entries in the delayed-retry queue. When
	// exceeded, Run returns an error rather than silently falling behind on
	// verification. Default 1024.
	MaxQueueSize int

	// TargetChunkTime, if set, is passed through to chunker feedback so the
	// walker tunes chunk size to roughly this duration. Default 1s.
	TargetChunkTime time.Duration

	// Recopier is invoked when the retry path detects stable target
	// divergence (src CRC unchanged across a retry window, target still
	// wrong). When nil, that condition surfaces as ErrPermanentDivergence
	// from Run — useful for tests and for callers that prefer to halt
	// rather than self-heal. Production sync callers should provide
	// MySQLRecopier.
	Recopier Recopier

	Logger *slog.Logger
}

ContinuousCheckerConfig configures a ContinuousChecker. See Default for the runtime defaults applied by the constructor when fields are zero.

type ContinuousCheckerStats added in v0.15.0

type ContinuousCheckerStats struct {
	// PassesCompleted is the number of passes finished so far. A pass
	// completes when every chunk has resolved (READ-verified or recopied);
	// only a pass with zero recopies counts as clean for the
	// FirstCleanPass signal.
	PassesCompleted uint64

	// CurrentPass is the 1-indexed pass number in flight (0 before the
	// first pass starts).
	CurrentPass uint64

	// ChunksThisPass is how many chunks the walker has emitted in the
	// current pass.
	ChunksThisPass uint64

	// ChunksPassedThisPass is how many chunks have gone clean in the
	// current pass (either initially or via retry).
	ChunksPassedThisPass uint64

	// MismatchesThisPass is how many chunks mismatched on their initial
	// (fresh-walk) read in the current pass and were enqueued for retry.
	// On a clean pass this equals PassedSecondAttemptThisPass +
	// PassedUnder5AttemptsThisPass + PassedUnder10AttemptsThisPass +
	// RecopiesThisPass — i.e. every chunk that needed at least one retry
	// to converge. Resets each pass.
	MismatchesThisPass uint64

	// Per-pass histogram of attempts-to-converge. "attempts" counts every
	// read of the chunk (initial fresh-walk + each retry). Buckets are
	// non-overlapping; their sum equals ChunksPassedThisPass on a clean
	// pass. All reset each pass.
	PassedFirstAttemptThisPass    uint64 // 1 attempt (no retry needed)
	PassedSecondAttemptThisPass   uint64 // 2 attempts (1 retry)
	PassedUnder5AttemptsThisPass  uint64 // 3-4 attempts
	PassedUnder10AttemptsThisPass uint64 // 5-9 attempts
	// RecopiesThisPass is the count of chunks that were recopied this
	// pass — i.e. retry detected stable target divergence (source CRC
	// unchanged across the retry window, target still wrong) and the
	// configured Recopier rewrote the chunk from source. Zero when no
	// Recopier is configured (those failures surface as
	// ErrPermanentDivergence and abort the run instead). A pass with
	// RecopiesThisPass > 0 cannot be the first clean pass — recopied
	// chunks are repaired, not verified, and are re-read on the next
	// pass before FirstCleanPass can fire.
	RecopiesThisPass uint64

	// RetryQueueDepth is the current size of the delayed-retry queue.
	RetryQueueDepth int

	// HotChunkCount is the number of entries currently in the retry queue
	// with consecutiveSrcChanged >= 2 — i.e. a chunk that has been observed
	// changing on the source across multiple retry windows.
	HotChunkCount int

	// WalkerStalls is the lifetime count of times the dispatcher refused
	// to read a fresh chunk from the walker because the retry queue was
	// already at MaxQueueSize. Each stall represents the checker holding
	// back the walker until existing retries drain enough to make room —
	// it does not abort the run. A persistently rising value means source
	// churn is outpacing the verifier (consider tuning MaxQueueSize,
	// Concurrency, or RetryDelay).
	WalkerStalls uint64

	// MismatchesDetected is the lifetime count of initial-read mismatches
	// (does not include re-failures within a single retry sequence).
	MismatchesDetected uint64

	// PermanentFailures is the lifetime count of chunks that failed twice
	// in a row with the source CRC unchanged. Run returns on the first such
	// event; this counter is bumped immediately before the error returns.
	PermanentFailures uint64

	// FirstCleanPassAt is the wall-clock time at which the first clean
	// pass completed (zero before that).
	FirstCleanPassAt time.Time
}

ContinuousCheckerStats is a snapshot of the checker's counters. All fields are point-in-time; for monotonic totals, sample successively.

type DistributedChecker added in v0.10.1

type DistributedChecker struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*DistributedChecker) ChecksumChunk added in v0.10.1

func (c *DistributedChecker) ChecksumChunk(ctx context.Context, chunk *table.Chunk) error

func (*DistributedChecker) DifferencesFound added in v0.14.0

func (c *DistributedChecker) DifferencesFound() uint64

DifferencesFound returns the number of chunks where a source/target mismatch was detected in the most recent (or in-flight) pass. Used by the continuous-checksum loop to decide whether a cancellation swallow is safe.

func (*DistributedChecker) ExecTime added in v0.10.1

func (c *DistributedChecker) ExecTime() time.Duration

func (*DistributedChecker) GetProgress added in v0.10.1

func (c *DistributedChecker) GetProgress() string

GetProgress returns the progress of the checker this is really just a proxy to the chunker progress.

func (*DistributedChecker) Run added in v0.10.1

func (*DistributedChecker) StartTime added in v0.10.1

func (c *DistributedChecker) StartTime() time.Time

type MySQLRecopier added in v0.15.0

type MySQLRecopier struct {
	// contains filtered or unexported fields
}

MySQLRecopier is the production Recopier used by `spirit sync`. Given a chunk that the continuous checker has identified as stably diverged (source CRC unchanged across the retry window, target still wrong), it rewrites the chunk's rows on the target from the source.

The operation is the cross-DB analog of SingleChecker.replaceChunk:

  1. DELETE the chunk's key range on the target.
  2. SELECT the chunk's rows from the source.
  3. Hand the rows to the applier's Apply method, which upserts them on the target through the same write path the change feed uses.

Recopies are serialized by an internal mutex. Concurrent DELETE + INSERT on overlapping chunks can deadlock on secondary indexes (see the transcript in single.go around line 211); serialization keeps the fix-up path safe at the cost of a small stall if multiple chunks need recopy at once. Since stable divergence is rare, this is acceptable.

The DELETE and Apply run under a context derived from context.WithoutCancel(ctx) so a parent cancellation between them does not leave the target with rows deleted but not yet rewritten. A bounded timeout (10 minutes) still protects against a hung Apply.

func NewMySQLRecopier added in v0.15.0

func NewMySQLRecopier(sourceDB, targetDB *sql.DB, app applier.Applier, dbConfig *dbconn.DBConfig, logger *slog.Logger) (*MySQLRecopier, error)

NewMySQLRecopier constructs a recopier for the source/target pair. The applier must be Started before Recopy is called (the production wiring in datasync.Runner starts the applier during the copy phase and leaves it running through continuous sync, so this is satisfied naturally).

func (*MySQLRecopier) Recopy added in v0.15.0

func (r *MySQLRecopier) Recopy(ctx context.Context, chunk *table.Chunk) error

Recopy rewrites the chunk's rows on the target from the source. See MySQLRecopier's struct doc for the operation's shape and concurrency rules.

type Recopier added in v0.15.0

type Recopier interface {
	Recopy(ctx context.Context, chunk *table.Chunk) error
}

Recopier knows how to overwrite a single chunk's worth of data on the target from the source. It is invoked when the continuous checker's retry path detects stable target divergence — i.e. the source CRC is unchanged across a retry window but the target CRC is still wrong.

Recopy must be safe to call concurrently from multiple worker goroutines; implementations are expected to serialize internally where needed (see MySQLRecopier for the production implementation).

type SingleChecker added in v0.10.1

type SingleChecker struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*SingleChecker) ChecksumChunk added in v0.10.1

func (c *SingleChecker) ChecksumChunk(ctx context.Context, trxPool *dbconn.TrxPool, chunk *table.Chunk) error

func (*SingleChecker) DifferencesFound added in v0.14.0

func (c *SingleChecker) DifferencesFound() uint64

DifferencesFound returns the number of chunks where a source/target mismatch was detected in the most recent (or in-flight) pass. Used by the continuous-checksum loop to decide whether a cancellation swallow is safe.

func (*SingleChecker) ExecTime added in v0.10.1

func (c *SingleChecker) ExecTime() time.Duration

func (*SingleChecker) GetProgress added in v0.10.1

func (c *SingleChecker) GetProgress() string

GetProgress returns the progress of the checker this is really just a proxy to the chunker progress.

func (*SingleChecker) Run added in v0.10.1

func (c *SingleChecker) Run(ctx context.Context) error

func (*SingleChecker) StartTime added in v0.10.1

func (c *SingleChecker) StartTime() time.Time

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL