Documentation
¶
Overview ¶
Package checksum provides online checksum functionality. Two tables on the same MySQL server can be compared with only an initial lock. It is not in the row/ package because it requires a replClient to be passed in, which would cause a circular dependency.
Package checksum provides online checksum functionality. Two tables on the same MySQL server can be compared with only an initial lock. It is not in the row/ package because it requires a replClient to be passed in, which would cause a circular dependency.
Index ¶
- Constants
- Variables
- type Checker
- type CheckerConfig
- type ContinuousChecker
- type ContinuousCheckerConfig
- type ContinuousCheckerStats
- type DistributedChecker
- func (c *DistributedChecker) ChecksumChunk(ctx context.Context, chunk *table.Chunk) error
- func (c *DistributedChecker) DifferencesFound() uint64
- func (c *DistributedChecker) ExecTime() time.Duration
- func (c *DistributedChecker) GetProgress() string
- func (c *DistributedChecker) Run(ctx context.Context) error
- func (c *DistributedChecker) StartTime() time.Time
- type MySQLRecopier
- type Recopier
- type SingleChecker
- func (c *SingleChecker) ChecksumChunk(ctx context.Context, trxPool *dbconn.TrxPool, chunk *table.Chunk) error
- func (c *SingleChecker) DifferencesFound() uint64
- func (c *SingleChecker) ExecTime() time.Duration
- func (c *SingleChecker) GetProgress() string
- func (c *SingleChecker) Run(ctx context.Context) error
- func (c *SingleChecker) StartTime() time.Time
Constants ¶
const ( DefaultContinuousConcurrency = 4 DefaultContinuousRetryDelay = time.Minute DefaultContinuousMaxQueueSize = 1024 DefaultContinuousTargetChunkTime = 1 * time.Second )
Default values applied by NewContinuousChecker for zero-valued config fields. Exported so callers can reference them when tuning.
Variables ¶
var ( // ErrYieldTimeout is returned by runChecksum when the yield timeout expires. // This is distinct from the parent context being canceled, and signals that // the checksum should resume from the current watermark after releasing // long-running transactions to reduce HLL (history list length) growth. ErrYieldTimeout = errors.New("checksum yield timeout") // DefaultYieldTimeout is the default maximum duration for a single checksum // pass before yielding to release long-running REPEATABLE READ transactions. DefaultYieldTimeout = 24 * time.Hour )
var ErrPermanentDivergence = errors.New("checksum: permanent divergence detected")
ErrPermanentDivergence is returned by Run when a chunk fails twice in a row with the source CRC unchanged AND no Recopier is configured — i.e. the target has data the source does not, the source is not racing, and the checker has no way to self-heal. With a Recopier configured this error is never returned: stable divergence triggers a Recopy and the chunk is counted in the per-pass "recopies" bucket.
This can technically false-positive if replication lag exceeds the retry delay — there may be changes that are still pending but we've not observed them yet. The retry delay defaults to 1 minute for that reason.
Functions ¶
This section is empty.
Types ¶
type Checker ¶
type Checker interface {
// Run performs the checksum operation.
Run(ctx context.Context) error
GetProgress() string
StartTime() time.Time
ExecTime() time.Duration
// DifferencesFound returns the number of chunks where a source/target
// mismatch was detected during the most recent (or in-flight) pass.
// Useful for callers that need to distinguish "clean cancellation" from
// "cancellation while a fix may have been mid-flight" — the continuous-
// checksum loop uses it to decide whether a sentinel-drop swallow is
// safe.
DifferencesFound() uint64
}
func NewChecker ¶
func NewChecker(sourceDBs []*sql.DB, chunker table.Chunker, feeds []change.Source, config *CheckerConfig) (Checker, error)
NewChecker creates a new checksum object. sourceDBs contains the source database connections (one for single-source migrations, multiple for N:M moves). The distributed checker aggregates checksums across all sources. The single checker uses sourceDBs[0].
type CheckerConfig ¶
type CheckerConfig struct {
Concurrency int
TargetChunkTime time.Duration
DBConfig *dbconn.DBConfig
Logger *slog.Logger
FixDifferences bool
Watermark string // optional; defines a watermark to start from
MaxRetries int
Applier applier.Applier // optional; indicates it is a distributed checker
YieldTimeout time.Duration // maximum duration for a single checksum pass before yielding to release long-running transactions
}
func NewCheckerDefaultConfig ¶
func NewCheckerDefaultConfig() *CheckerConfig
type ContinuousChecker ¶ added in v0.15.0
type ContinuousChecker struct {
// contains filtered or unexported fields
}
ContinuousChecker is the eventually-consistent checker. Construct via NewContinuousChecker; use Run to drive it until ctx is cancelled or a permanent failure surfaces. Concurrent calls to Stats and FirstCleanPass are safe at any time.
func NewContinuousChecker ¶ added in v0.15.0
func NewContinuousChecker( sourceDB, targetDB *sql.DB, chunker table.Chunker, feed change.Source, cfg ContinuousCheckerConfig, ) (*ContinuousChecker, error)
NewContinuousChecker constructs a checker with the given dependencies and config. sourceDB and targetDB must be distinct connections to the source and target databases respectively. chunker must be Open before Run; the checker Resets it between passes but does not close it.
func (*ContinuousChecker) FirstCleanPass ¶ added in v0.15.0
func (c *ContinuousChecker) FirstCleanPass() <-chan struct{}
FirstCleanPass returns a channel that is closed the first time a pass completes with every chunk READ-verified equal and zero recopies. A pass containing a recopy does not qualify: the repaired rows were never observed equal, so the signal waits for a follow-up pass that re-reads them (and everything else) with no repairs needed. The signal is monotonic: once closed it stays closed. Callers that need a "data is known consistent" gate should select on this channel. Safe to call concurrently with Run.
func (*ContinuousChecker) Run ¶ added in v0.15.0
func (c *ContinuousChecker) Run(ctx context.Context) error
Run drives the checker until ctx is cancelled or a permanent failure is detected. On ctx cancellation Run returns ctx.Err() (typically context.Canceled or context.DeadlineExceeded); callers that want to treat a clean shutdown as nil should filter that themselves (see how datasync.Runner.runContinuous does it). A permanent failure — a chunk that mismatched twice in a row with the source CRC unchanged and no Recopier was configured — returns ErrPermanentDivergence. Errors from the chunker walker (chunker.Next failures) are wrapped and returned.
MaxQueueSize is a soft backpressure threshold rather than a hard cap: when the retry queue reaches it, the dispatcher stops reading fresh chunks from the walker until existing retries drain enough to make room. The walker blocks on its send; workers continue draining. WalkerStalls in the stats snapshot counts how often this has fired.
func (*ContinuousChecker) Stats ¶ added in v0.15.0
func (c *ContinuousChecker) Stats() ContinuousCheckerStats
Stats returns a point-in-time snapshot of the checker's counters. Safe to call concurrently with Run.
type ContinuousCheckerConfig ¶ added in v0.15.0
type ContinuousCheckerConfig struct {
// Concurrency is the number of worker goroutines. Default 4.
Concurrency int
// RetryDelay is the minimum wait between attempts for any given chunk —
// measured from the *last* attempt of that chunk, not from the original
// failure. Default 1m, because changes are queued in the replication
// applier for 30s by default.
RetryDelay time.Duration
// MaxQueueSize is the cap on entries in the delayed-retry queue. When
// exceeded, Run returns an error rather than silently falling behind on
// verification. Default 1024.
MaxQueueSize int
// TargetChunkTime, if set, is passed through to chunker feedback so the
// walker tunes chunk size to roughly this duration. Default 1s.
TargetChunkTime time.Duration
// Recopier is invoked when the retry path detects stable target
// divergence (src CRC unchanged across a retry window, target still
// wrong). When nil, that condition surfaces as ErrPermanentDivergence
// from Run — useful for tests and for callers that prefer to halt
// rather than self-heal. Production sync callers should provide
// MySQLRecopier.
Recopier Recopier
Logger *slog.Logger
}
ContinuousCheckerConfig configures a ContinuousChecker. See Default for the runtime defaults applied by the constructor when fields are zero.
type ContinuousCheckerStats ¶ added in v0.15.0
type ContinuousCheckerStats struct {
// PassesCompleted is the number of passes finished so far. A pass
// completes when every chunk has resolved (READ-verified or recopied);
// only a pass with zero recopies counts as clean for the
// FirstCleanPass signal.
PassesCompleted uint64
// CurrentPass is the 1-indexed pass number in flight (0 before the
// first pass starts).
CurrentPass uint64
// ChunksThisPass is how many chunks the walker has emitted in the
// current pass.
ChunksThisPass uint64
// ChunksPassedThisPass is how many chunks have gone clean in the
// current pass (either initially or via retry).
ChunksPassedThisPass uint64
// MismatchesThisPass is how many chunks mismatched on their initial
// (fresh-walk) read in the current pass and were enqueued for retry.
// On a clean pass this equals PassedSecondAttemptThisPass +
// PassedUnder5AttemptsThisPass + PassedUnder10AttemptsThisPass +
// RecopiesThisPass — i.e. every chunk that needed at least one retry
// to converge. Resets each pass.
MismatchesThisPass uint64
// Per-pass histogram of attempts-to-converge. "attempts" counts every
// read of the chunk (initial fresh-walk + each retry). Buckets are
// non-overlapping; their sum equals ChunksPassedThisPass on a clean
// pass. All reset each pass.
PassedFirstAttemptThisPass uint64 // 1 attempt (no retry needed)
PassedSecondAttemptThisPass uint64 // 2 attempts (1 retry)
PassedUnder5AttemptsThisPass uint64 // 3-4 attempts
PassedUnder10AttemptsThisPass uint64 // 5-9 attempts
// RecopiesThisPass is the count of chunks that were recopied this
// pass — i.e. retry detected stable target divergence (source CRC
// unchanged across the retry window, target still wrong) and the
// configured Recopier rewrote the chunk from source. Zero when no
// Recopier is configured (those failures surface as
// ErrPermanentDivergence and abort the run instead). A pass with
// RecopiesThisPass > 0 cannot be the first clean pass — recopied
// chunks are repaired, not verified, and are re-read on the next
// pass before FirstCleanPass can fire.
RecopiesThisPass uint64
// RetryQueueDepth is the current size of the delayed-retry queue.
RetryQueueDepth int
// HotChunkCount is the number of entries currently in the retry queue
// with consecutiveSrcChanged >= 2 — i.e. a chunk that has been observed
// changing on the source across multiple retry windows.
HotChunkCount int
// WalkerStalls is the lifetime count of times the dispatcher refused
// to read a fresh chunk from the walker because the retry queue was
// already at MaxQueueSize. Each stall represents the checker holding
// back the walker until existing retries drain enough to make room —
// it does not abort the run. A persistently rising value means source
// churn is outpacing the verifier (consider tuning MaxQueueSize,
// Concurrency, or RetryDelay).
WalkerStalls uint64
// MismatchesDetected is the lifetime count of initial-read mismatches
// (does not include re-failures within a single retry sequence).
MismatchesDetected uint64
// PermanentFailures is the lifetime count of chunks that failed twice
// in a row with the source CRC unchanged. Run returns on the first such
// event; this counter is bumped immediately before the error returns.
PermanentFailures uint64
// FirstCleanPassAt is the wall-clock time at which the first clean
// pass completed (zero before that).
FirstCleanPassAt time.Time
}
ContinuousCheckerStats is a snapshot of the checker's counters. All fields are point-in-time; for monotonic totals, sample successively.
type DistributedChecker ¶ added in v0.10.1
func (*DistributedChecker) ChecksumChunk ¶ added in v0.10.1
func (*DistributedChecker) DifferencesFound ¶ added in v0.14.0
func (c *DistributedChecker) DifferencesFound() uint64
DifferencesFound returns the number of chunks where a source/target mismatch was detected in the most recent (or in-flight) pass. Used by the continuous-checksum loop to decide whether a cancellation swallow is safe.
func (*DistributedChecker) ExecTime ¶ added in v0.10.1
func (c *DistributedChecker) ExecTime() time.Duration
func (*DistributedChecker) GetProgress ¶ added in v0.10.1
func (c *DistributedChecker) GetProgress() string
GetProgress returns the progress of the checker this is really just a proxy to the chunker progress.
func (*DistributedChecker) Run ¶ added in v0.10.1
func (c *DistributedChecker) Run(ctx context.Context) error
func (*DistributedChecker) StartTime ¶ added in v0.10.1
func (c *DistributedChecker) StartTime() time.Time
type MySQLRecopier ¶ added in v0.15.0
type MySQLRecopier struct {
// contains filtered or unexported fields
}
MySQLRecopier is the production Recopier used by `spirit sync`. Given a chunk that the continuous checker has identified as stably diverged (source CRC unchanged across the retry window, target still wrong), it rewrites the chunk's rows on the target from the source.
The operation is the cross-DB analog of SingleChecker.replaceChunk:
- DELETE the chunk's key range on the target.
- SELECT the chunk's rows from the source.
- Hand the rows to the applier's Apply method, which upserts them on the target through the same write path the change feed uses.
Recopies are serialized by an internal mutex. Concurrent DELETE + INSERT on overlapping chunks can deadlock on secondary indexes (see the transcript in single.go around line 211); serialization keeps the fix-up path safe at the cost of a small stall if multiple chunks need recopy at once. Since stable divergence is rare, this is acceptable.
The DELETE and Apply run under a context derived from context.WithoutCancel(ctx) so a parent cancellation between them does not leave the target with rows deleted but not yet rewritten. A bounded timeout (10 minutes) still protects against a hung Apply.
func NewMySQLRecopier ¶ added in v0.15.0
func NewMySQLRecopier(sourceDB, targetDB *sql.DB, app applier.Applier, dbConfig *dbconn.DBConfig, logger *slog.Logger) (*MySQLRecopier, error)
NewMySQLRecopier constructs a recopier for the source/target pair. The applier must be Started before Recopy is called (the production wiring in datasync.Runner starts the applier during the copy phase and leaves it running through continuous sync, so this is satisfied naturally).
type Recopier ¶ added in v0.15.0
Recopier knows how to overwrite a single chunk's worth of data on the target from the source. It is invoked when the continuous checker's retry path detects stable target divergence — i.e. the source CRC is unchanged across a retry window but the target CRC is still wrong.
Recopy must be safe to call concurrently from multiple worker goroutines; implementations are expected to serialize internally where needed (see MySQLRecopier for the production implementation).
type SingleChecker ¶ added in v0.10.1
func (*SingleChecker) ChecksumChunk ¶ added in v0.10.1
func (*SingleChecker) DifferencesFound ¶ added in v0.14.0
func (c *SingleChecker) DifferencesFound() uint64
DifferencesFound returns the number of chunks where a source/target mismatch was detected in the most recent (or in-flight) pass. Used by the continuous-checksum loop to decide whether a cancellation swallow is safe.
func (*SingleChecker) ExecTime ¶ added in v0.10.1
func (c *SingleChecker) ExecTime() time.Duration
func (*SingleChecker) GetProgress ¶ added in v0.10.1
func (c *SingleChecker) GetProgress() string
GetProgress returns the progress of the checker this is really just a proxy to the chunker progress.
func (*SingleChecker) StartTime ¶ added in v0.10.1
func (c *SingleChecker) StartTime() time.Time