repl

package
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2026 License: Apache-2.0 Imports: 21 Imported by: 0

README

Replication Client

The replication client tracks changes to tables by acting as a MySQL replica. The go-mysql library does most of the heavy lifting by connecting to MySQL and parsing binary log events. Spirit's role is to manage subscriptions for each table being migrated, deduplicate changes, and coordinate with the copier to avoid redundant work.

Each table tracked is represented by a subscription. There is a single subscription type — the buffered map — that stores the full row image from the binlog and applies it through the applier. For non-memory-comparable primary keys it falls back to a FIFO queue internally once the watermark optimization is disabled, but row images are still preserved and the applier path is still used.

Subscription Implementation

Background

Earlier versions of Spirit shipped two subscription types side-by-side: a deltaMap that stored only primary-key hashes (and re-read row state from the source via REPLACE INTO ... SELECT at flush time), and a deltaQueue that preserved binlog order for non-memory-comparable PKs. The split caused issue #746: MySQL's binlog-vs-visibility ordering meant that the deltaMap path could read a stale row image when its SELECT raced ahead of the row's commit visibility, applying the wrong final state.

The fix was to unify everything around a single subscription type — the buffered map — that captures the full row image from the binlog directly, so the applied state is the binlog state and the source-side SELECT race is gone. The deltaMap and deltaQueue types were removed entirely; the FIFO behaviour previously provided by deltaQueue now lives inside bufferedMap as an internal mode for non-memory-comparable PKs (see below).

Buffered Map

The buffered map stores the full row image directly from the binlog and applies it through the applier interface:

How it works:

  • Maintains a map of primaryKeyHash -> (isDelete, fullRowImage).
  • Multiple changes to the same row are automatically deduplicated (only the final state is stored).
  • Uses the applier's UpsertRows and DeleteKeys to write changes — there is no SELECT FROM original round-trip.
  • Flushes changes through the applier's parallel write workers.

Advantages:

  • Excellent deduplication: if a row is modified 100 times, only one upsert is performed.
  • Parallel flushing: independent keys can be written concurrently via the applier.
  • No source-side reads at flush: the row image is already in memory, so no contention with OLTP traffic on the source.
  • Sidesteps the binlog/visibility race: because the row image is the applied state, there is no opportunity for MySQL's binlog-vs-visibility ordering to surface a stale row (see issue #746). This also makes spirit safe to run against sources configured with semi-synchronous replication, which can widen that window by tens or hundreds of milliseconds depending on replica ACK latency. The mysql-semisync-docker.yml CI lane exercises this configuration end-to-end.
  • Watermark optimization (when supported by the chunker): can skip ranges of keys using both KeyAboveHighWatermark and KeyBelowLowWatermark.
  • Cross-server compatibility: the applier can target a different MySQL server, which is what pkg/move relies on.

Limitations:

  • Requires binlog_row_image=FULL and an empty binlog_row_value_options (the applier needs the complete row image).
  • Higher memory usage than a key-only map: stores full row data for each changed key.
  • Watermark optimizations (KeyAboveHighWatermark and KeyBelowLowWatermark) are available on MappedChunker implementations (both optimistic and composite chunkers). They work correctly for numeric, binary, and temporal primary key types. For VARCHAR/TEXT columns with collations, Go's byte-order comparison may differ from MySQL's collation order; any discrepancies are caught by the checksum phase (see issue #479).

Map iteration order is irrelevant because the applier issues REPLACE INTO target VALUES (...), which deletes any row that conflicts on PRIMARY KEY or any UNIQUE index before each insert. That makes the multi-row VALUES list order-independent — see "Applier idempotence via REPLACE INTO" below.

Example scenario:

Binlog events:  INSERT(id=1, ...), UPDATE(id=1, ...), UPDATE(id=1, ...), DELETE(id=2)
Buffered map:   {1: {row: <latest image>}, 2: {isDelete}}
Applied:        UpsertRows({id=1, ...}); DeleteKeys({id=2});
FIFO fallback for non-memory-comparable primary keys

For tables with non-memory-comparable primary keys (e.g. VARCHAR with a case-insensitive collation), the subscription uses LWW buffered-map dedup during the copy phase and switches to an internal FIFO queue post-copy. The queue still stores row images inline and applies them via the applier — there is no REPLACE INTO ... SELECT, so the #746 fix and cross-server move support (issue #607) are preserved. The queue exists only to preserve binlog order: collation-equivalent keys like "A" and "a" hash to different map slots but resolve to the same MySQL row, so a map's non-deterministic iteration would apply events out of order. FIFO replay through the applier preserves binlog order; the target's own collation-aware uniqueness then collapses the events onto the right row.

During the copy phase the chunker's own SELECT covers in-window case-collision races, so LWW map dedup is safe and considerably faster. When the watermark optimization is disabled at the end of the copy phase, SetWatermarkOptimization drains the map inline and the subscription switches into queue mode for the cutover/checksum window. The post-cutover checksum (with FixDifferences=true) repairs any residual divergence.

Memory-comparable PKs always use the buffered map, since map-key equality matches MySQL row identity.

Applier idempotence via REPLACE INTO (#847)

The applier writes a multi-row statement per batch. We use:

REPLACE INTO target (cols) VALUES (...), (...), ...;

rather than INSERT ... ON DUPLICATE KEY UPDATE. The choice matters whenever two rows in the same batch can collide on a unique key — typically because a source-side transaction legally moves a unique value between rows:

-- Legal in source: deactivate one row, then activate another,
-- inside a single transaction. UNIQUE(slot_id) allows NULLs to
-- duplicate, so the invariant holds.
START TRANSACTION;
UPDATE t SET slot_id = NULL  WHERE id = 1;  -- was 'S'
UPDATE t SET slot_id = 'S'   WHERE id = 2;  -- was NULL
COMMIT;

With INSERT ... ON DUPLICATE KEY UPDATE, MySQL processes the multi-row VALUES list in array order and resolves only the first conflict on each row (via the UPDATE clause). If the resulting update introduces a second unique-key collision the statement fails with Error 1062. The map's randomized iteration meant a swap pair could land "activate-first" in the batch, hitting that exact failure.

REPLACE INTO is order-independent for this case. Per the docs:

REPLACE works exactly like INSERT, except that if an old row in the table has the same value as a new row for a PRIMARY KEY or a UNIQUE index, the old row is deleted before the new row is inserted.

So each row's conflicts — on PK or any unique index — are deleted before that row's insert runs, irrespective of where the conflicting row sits in the batch. The swap pair collapses to "delete the previous holder, insert the new holder" and the order of the two events inside the batch doesn't matter.

This is the same robustness the pre-#821 deltaMap had with REPLACE INTO target SELECT FROM source, but without the read-after-commit race that motivated #746 — we supply the inline row image, not a SELECT against source.

Eventual consistency between batches

REPLACE's "delete any unique-key conflict before each insert" semantic means a single REPLACE statement can delete more rows than the ones in its VALUES list — specifically, any row currently in the destination that previously held a unique value the new row is now claiming. That row is briefly missing from the destination until its own event arrives in a later batch (or in the same batch but processed later) and re-inserts it.

Concretely, for the swap pair above with batches of size 1:

Step Batch Destination state
0 id=1: 'S', id=2: NULL
1 REPLACE (id=1, slot=NULL) id=1: NULL, id=2: NULL
2 REPLACE (id=2, slot='S') id=1: NULL, id=2: 'S'

And for the same swap pair if the activate landed first across batches:

Step Batch Destination state
0 id=1: 'S', id=2: NULL
1 REPLACE (id=2, slot='S') id=2: 'S' (id=1 deleted — unique-key conflict on 'S')
2 REPLACE (id=1, slot=NULL) id=1: NULL, id=2: 'S' (id=1 re-inserted)

Binlog ordering gives us the first table in practice — within a single source-side transaction, the deactivate event has a lower binlog position than the activate — but Spirit's correctness does not depend on which case occurs. The destination converges to source's current state once the last unflushed event for each affected PK has been applied.

This eventual consistency is safe because the bufferedMap is an up-to-date and disjoint representation of pending changes: every PK appears at most once at flush time, holding the latest row image MySQL emitted for it. Any row transiently deleted by REPLACE's conflict resolution is therefore guaranteed to have its own event in the buffer (or arriving shortly) — its row image isn't lost, just temporarily not yet applied. The post-cutover checksum (with FixDifferences=true) is the backstop for anything that slips through.

See TestBufferedMapSwapPairFlushesViaReplace (unit) and TestSwapPairEndToEndViaReplace (end-to-end) for the regression gates.

Features

Watermark Optimization

The watermark optimization is a critical performance feature that prevents the replication client from doing redundant work during the copy phase.

The Problem: During the initial copy phase, the copier is reading rows from the source table and writing them to the new table. Meanwhile, the replication client is also receiving binlog events for those same rows. Without optimization, we would:

  1. Copy row with id=1000 from source to target
  2. Receive a binlog event for id=1000 (from before the copy)
  3. Apply the binlog change, overwriting what we just copied
  4. Result: Wasted work and potential deadlocks

The Solution: The copier maintains a "watermark" representing its progress. The replication client uses this watermark to filter changes:

  • High watermark: Skip changes for rows that haven't been copied yet (they'll be picked up by the copier)
  • Low watermark: Skip changes for rows that are currently being copied (avoid races with the copier, which may cause deadlocks/lock waits)
if chunker.KeyAboveHighWatermark(key[0]) {
    return  // Skip, copier will handle this
}

if !chunker.KeyBelowLowWatermark(key[0]) {
    continue  // Skip, copier is actively working on this range
}

Important: The watermark optimization is disabled before the final cutover to ensure all changes are applied regardless of the copier's position.

Checkpointing

The replication client tracks two positions:

  • Buffered position: All events have been read from the server and stored in memory
  • Flushed position: All events have been successfully applied to the target table
// Get the safe checkpoint position
pos := client.GetBinlogApplyPosition()

// Resume from a checkpoint
client.SetFlushedPos(savedPosition)
err := client.Run(ctx)

Periodically, changes are flushed to advance the flushed position, which is then used as part of checkpoints. Because all replication changes are idempotent, it is understood that on recovery some changes will effectively be re-flushed, and the last ~1 minute of progress may have been lost.

Final Cutover coordination

Before a cutover operation can run, it's important to ensure that there are no unapplied replication changes. The best practice way to do this is to first Flush(ctx) without a lock, and then repeat the flush with the lock held. i.e.

// Ensure most changes are up to date before we need to do this again
// with a lock held (ensures lock duration is as short as possible)
err = client.Flush(ctx)

// Acquire table lock
lock, err := dbconn.LockTable(ctx, db, sourceTable)

// Flush all remaining changes under the lock
err = client.FlushUnderTableLock(ctx, lock)

// This check should be redundant, but we verify everything is applied
if !client.AllChangesFlushed() {
    return errors.New("changes still pending")
}

// Safe to cutover now

The client.Flush() will retry in a loop until the number of pending changes is considered trivial (currently <10K). It is important to handle errors correctly here, because FlushUnderTableLock may fail if it can't flush the pending changes fast enough. This is your cue to abandon the cutover operation for now, and try again when the server is under less load.

Memory backpressure

Each subscription approximates the bytes it is holding in memory (row image + key bytes per buffered change) and parks HasChanged on a per-subscription condition variable when the total reaches DefaultSubscriptionSoftLimitBytes (256 MiB). This keeps wide rows — LONGTEXT, BLOB, large JSON — from OOMing the migrator when the source's write rate outpaces the applier.

The cap is soft: the wait is checked before a change is added, against the buffer's current pre-add size. A row is therefore always admitted whenever sizeBytes < softLimitBytes, even if its own size pushes the total well past the limit; the cap only blocks new arrivals once the buffer is already at or over it. This is intentional — it preserves forward progress regardless of row width — but it does mean peak memory can exceed DefaultSubscriptionSoftLimitBytes by up to one oversized row's worth before the next caller parks.

Override via ClientConfig.SubscriptionSoftLimitBytes; pass a negative value to disable the cap entirely. The times_parked_on_soft_limit and size_bytes fields appear in the watermark-toggled log line, and keys_added / keys_dropped_above_high / keys_skipped_not_below_low provide the surrounding context.

Limitation — binlog retention: while parked, the binlog reader makes no progress. If the source rotates past the reader's current position (binlog_expire_logs_seconds) before the buffer drains, the reader will fail to resume and the migration will abort. Tune the soft limit and source retention together for sustained high-write workloads.

Other Minor Features
  • Automatic recovery: Handles transient errors and reconnects to the binlog stream without data loss
  • DDL detection: Monitors for schema changes and notifies the migration coordinator. This is used to abandon any schema changes if the table was externally modified.

See Also

Documentation

Overview

Package repl contains binary log subscription functionality.

Index

Constants

View Source
const (

	// DefaultBatchSize is the fixed number of rows in each batched REPLACE/
	// DELETE statement that the binlog applier emits against the _new
	// table. Larger is better, but we need to keep the run-time of the
	// statement well below dbconn.maximumLockTime so that it doesn't
	// prevent copy-row tasks from failing. On Aurora tables with
	// out-of-cache workloads that copy ~300 rows per second this is close
	// to the safe ceiling.
	//
	// Was previously an initial value for an adaptive sizer (feedback()
	// driven by p90 apply time). That mechanism was meaningful when the
	// applier issued `REPLACE INTO _new ... SELECT FROM source` and S-locked
	// rows on the live table, but after #853 the applier emits inline
	// VALUES against _new only — no source-side locks — and the batches
	// are strictly serial inside flushBatch. There's nothing left to
	// throttle, so the batch size is just a constant. See issue #869.
	DefaultBatchSize = 1000

	// DefaultFlushInterval is the time that the client will flush all binlog changes to disk.
	// Longer values require more memory, but permit more merging.
	// I expect we will change this to 1hr-24hr in the future.
	DefaultFlushInterval = 30 * time.Second
	// DefaultSubscriptionSoftLimitBytes caps the approximate memory held
	// per subscription before HasChanged starts blocking on the buffered
	// map's condition variable. The cap is "soft": a single oversized
	// row admitted when the buffer is empty will exceed the limit, and
	// the next caller will park until that row drains. This keeps wide
	// rows (LONGTEXT / BLOB / large JSON) from OOMing the migrator
	// while still guaranteeing forward progress regardless of row width.
	// See pkg/repl/subscription_buffered.go for the accounting model.
	//
	// Operators should be aware that pausing the binlog reader for an
	// extended period risks falling past the source's binlog retention
	// (binlog_expire_logs_seconds). Tune this value, or the source's
	// retention, accordingly.
	DefaultSubscriptionSoftLimitBytes = 256 << 20
	// DefaultTimeout is how long BlockWait is supposed to wait before returning errors.
	DefaultTimeout = 30 * time.Second
)

Variables

View Source
var (

	// ErrChangesNotFlushed indicates that not all changes have been flushed from the replication feed.
	ErrChangesNotFlushed = errors.New("not all changes flushed")
)

Functions

func NewServerID

func NewServerID() uint32

NewServerID generates a unique server ID to avoid conflicts with other binlog readers. Uses crypto/rand combined with an atomic counter to ensure uniqueness even when called concurrently. Returns a value in the range 1001-4294967295 to avoid conflicts with typical MySQL server IDs (0-1000).

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(db *sql.DB, host string, username, password string, appl applier.Applier, config *ClientConfig) *Client

NewClient creates a new Client instance. config.Applier is required!

func (*Client) AddSubscription

func (c *Client) AddSubscription(currentTable, newTable *table.TableInfo, chunker table.MappedChunker) error

AddSubscription adds a new subscription. Returns an error if a subscription already exists for the given table.

func (*Client) AllChangesFlushed

func (c *Client) AllChangesFlushed() bool

func (*Client) BlockWait

func (c *Client) BlockWait(ctx context.Context) error

BlockWait blocks until all changes are *buffered*. i.e. the server's current position is 1234, but our buffered position is only 100. We need to read all the events until we reach >= 1234. We do not need to guarantee that they are flushed though, so you need to call Flush() to do that. This call times out! The default timeout is 10 seconds, after which an error will be returned.

func (*Client) Close

func (c *Client) Close()

func (*Client) Flush

func (c *Client) Flush(ctx context.Context) error

Flush empties the changeset in a loop until the amount of changes is considered "trivial". The loop is required, because changes continue to be added while the flush is occurring.

func (*Client) FlushUnderTableLock

func (c *Client) FlushUnderTableLock(ctx context.Context, lock *dbconn.TableLock) error

FlushUnderTableLock is a final flush under an exclusive table lock using the connection that holds a write lock. Because flushing generates binary log events, we actually want to call flush *twice*:

  • The first time flushes the pending changes to the new table.
  • We then ensure that we have all the binary log changes read from the server.
  • The second time reads through the changes generated by the first flush and updates the in memory applied position to match the server's position. This is required to satisfy the binlog position is updated for the c.AllChangesFlushed() check.

func (*Client) GetBinlogApplyPosition

func (c *Client) GetBinlogApplyPosition() mysql.Position

func (*Client) GetDeltaLen

func (c *Client) GetDeltaLen() int

GetDeltaLen returns the total number of changes that are pending across all subscriptions.

func (*Client) Run

func (c *Client) Run(ctx context.Context) error

Run initializes the binlog syncer and starts the binlog reader. It returns an error if the initialization fails.

func (*Client) SetFlushedPos

func (c *Client) SetFlushedPos(pos mysql.Position)

SetFlushedPos updates the known safe position that all changes have been flushed. It is used for resuming from a checkpoint.

func (*Client) SetWatermarkOptimization added in v0.10.1

func (c *Client) SetWatermarkOptimization(ctx context.Context, newVal bool) error

SetWatermarkOptimization sets both high and low watermark optimizations for all subscriptions. This should be disabled before checksum/cutover to ensure all changes are flushed regardless of watermark position.

Each subscription may drain its outgoing store on the toggle (see bufferedMap.SetWatermarkOptimization), so this can fail with the drain error. If one subscription fails, subsequent subscriptions are not touched and the caller should treat the operation as not-yet-applied.

Subscriptions are toggled against a snapshot so a long-running drain on one subscription doesn't block processRowsEvent from finding subscriptions for unrelated tables.

func (*Client) StartPeriodicFlush

func (c *Client) StartPeriodicFlush(ctx context.Context, interval time.Duration)

StartPeriodicFlush starts a goroutine that periodically flushes the binlog changeset, used by the migrator to advance the binlog position. Registration of the cancel/done pair happens synchronously in the caller's goroutine before the loop is spawned, so a follow-up StopPeriodicFlush is guaranteed to observe the registration. Callers MUST NOT prefix with `go` — the loop is spawned internally.

Calling Start while a flush is already running is a no-op.

func (*Client) StopPeriodicFlush

func (c *Client) StopPeriodicFlush()

StopPeriodicFlush stops the periodic flush goroutine started by StartPeriodicFlush and blocks until that goroutine has fully exited. Safe to call when no periodic flush is running (no-op).

type ClientConfig

type ClientConfig struct {
	Logger   *slog.Logger
	ServerID uint32
	DBConfig *dbconn.DBConfig // Database configuration including TLS settings

	// CancelFunc is an optional callback from the caller (e.g. migration or move runner).
	// It is called when a DDL change is detected on a subscribed table, or when a fatal
	// stream error occurs (such as minimal RBR detection or exhausted streamer recreation
	// attempts). The caller is expected to handle cancellation and cleanup.
	// It returns true if the error was acted upon (caller actually cancelled),
	// or false if it was ignored (e.g. because the caller is already past cutover).
	CancelFunc func() bool

	// DDLFilterSchema, when set, broadens DDL detection to cancel on any DDL change
	// in the specified schema, rather than only on exact table matches against subscriptions.
	// This is used by the move runner to detect DDL on any table in the source database.
	DDLFilterSchema string

	// DDLFilterTables, when set alongside DDLFilterSchema, narrows the schema-level
	// DDL detection to only the specified table names. This is used for partial moves
	// where only specific tables from a schema are being moved — DDL on unrelated
	// tables in the same schema should not trigger cancellation.
	// If empty (and DDLFilterSchema is set), all tables in the schema trigger cancellation.
	DDLFilterTables []string

	// SubscriptionSoftLimitBytes overrides DefaultSubscriptionSoftLimitBytes
	// for new subscriptions. Set to a negative value to disable the cap
	// entirely (HasChanged will never block on memory). Zero (the
	// zero-value default) means use DefaultSubscriptionSoftLimitBytes.
	SubscriptionSoftLimitBytes int64
}

func NewClientDefaultConfig

func NewClientDefaultConfig() *ClientConfig

NewClientDefaultConfig returns a default config for the copier.

type Subscription

type Subscription interface {
	HasChanged(key, row []any, deleted bool)
	Length() int
	Flush(ctx context.Context, underLock bool, lock *dbconn.TableLock) (allChangesFlushed bool, err error)
	Tables() []*table.TableInfo // returns the tables related to the subscription in currentTable, newTable order

	// SetWatermarkOptimization toggles both high and low watermark
	// optimizations. For non-memory-comparable PKs toggling switches the
	// subscription between map mode and queue mode; on such a transition
	// it drains the outgoing store via the applier so only one store has
	// pending entries at a time. Returns the drain error if any.
	SetWatermarkOptimization(ctx context.Context, enabled bool) error

	// Close signals that no further events will be delivered. Any HasChanged
	// caller currently parked on backpressure (e.g. the bufferedMap soft
	// memory limit) is unblocked so the binlog reader goroutine can exit.
	// Close does NOT flush; pending changes are discarded along with the
	// subscription. It is safe to call more than once.
	Close()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL