repl

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2026 License: Apache-2.0 Imports: 23 Imported by: 0

README

Replication Client

The replication client tracks changes to tables by acting as a MySQL replica. The go-mysql library does most of the heavy lifting by connecting to MySQL and parsing binary log events. Spirit's role is to manage subscriptions for each table being migrated, deduplicate changes, and coordinate with the copier to avoid redundant work.

Each table tracked is represented by a subscription, with three main implementations:

Subscription Implementations

Delta Map

The delta map is the preferred subscription type and is selected for tables with memory-comparable primary keys (integers, binary strings, etc.). It uses a map to track changes:

How it works:

  • Maintains a map of primaryKey -> (isDelete, originalKey)
  • Multiple changes to the same row are automatically deduplicated (only the final state is stored)
  • Flushes changes in parallel across multiple threads
  • Uses REPLACE INTO ... SELECT to apply changes efficiently

Advantages:

  • Excellent deduplication: If a row is modified 100 times, only one REPLACE operation is performed
  • Parallel flushing: Independent keys can be written concurrently for maximum throughput
  • Memory efficient: Only stores the latest state for each key
  • Watermark optimization (when supported by the chunker): Can skip ranges of keys using both KeyAboveHighWatermark and KeyBelowLowWatermark

Limitations:

  • Requires memory-comparable primary keys (no VARCHAR, FLOAT etc.)
  • Watermark optimizations are only effective when the selected chunker implements both KeyAboveHighWatermark and KeyBelowLowWatermark (e.g., the optimistic chunker for single-column AUTO_INCREMENT primary keys)
  • For composite or non-auto-increment primary keys (which use the composite chunker), both watermark methods currently return stub values, so these optimizations are effectively disabled (see issue #479)

Example scenario:

Binlog events:  INSERT(id=1), UPDATE(id=1), UPDATE(id=1), DELETE(id=2)
Delta map:      {1: REPLACE, 2: DELETE}
Applied:        REPLACE INTO ... WHERE id=1; DELETE FROM ... WHERE id=2;
Delta Queue

The delta queue is a fallback for tables with non-memory-comparable primary keys. It maintains a FIFO queue of changes:

How it works:

  • Maintains an ordered queue of (primaryKeyHash, isDelete) tuples
  • Changes are applied sequentially in the order they were received
  • Limited deduplication: only merges consecutive operations of the same type
  • Uses REPLACE INTO ... SELECT and DELETE statements

Advantages:

  • Universal compatibility: Works with any primary key type (VARCHAR, FLOAT, etc.)
  • Preserves order: Maintains the exact sequence of operations

Limitations:

  • Single-threaded flushing: Must process changes sequentially, limiting throughput
  • Poor deduplication: Stores all intermediate states, only merging consecutive operations
  • Higher memory usage: Can accumulate many operations for the same key
  • No support for optimizations: Key above and key below watermark optimizations are not possible

Example scenario:

Binlog events:  INSERT(k="abc"), UPDATE(k="def"), DELETE(k="def"), UPDATE(k="abc")
Delta queue:    [("abc", REPLACE), ("def" REPLACE), ("def", DELETE), ("abc", REPLACE)]
Applied:        REPLACE INTO ... WHERE k IN ("abc", "def"); DELETE FROM ... WHERE k="def"; REPLACE INTO ... WHERE k="abc";

Performance note: The delta queue is significantly worse performing than the other two implementations. It should only be used when the primary key type makes the delta map impossible. It is so bad in fact, that in the future it is worth considering disabling it and relying on the checksum to catch issues. See issue #475.

Buffered Map

The buffered map is a subscription type required for move operations where source and target are on different MySQL servers. It stores full row data and uses the applier interface:

How it works:

  • Maintains a map of primaryKeyHash -> (isDelete, fullRowData)
  • Stores complete row data in memory, not just primary keys
  • Uses the applier interface to write changes (supports sharding and remote targets)
  • Flushes changes in parallel using the applier's batching logic

Advantages:

  • Better concurrency: Doesn't hold locks on the source table during flush (no SELECT needed)
  • Supports sharding: Can distribute changes across multiple target databases via the applier
  • Flexible targets: Can potentially target non-MySQL destinations in the future
  • Watermark optimization: Supports both KeyAboveHighWatermark and KeyBelowLowWatermark optimizations

Limitations:

  • Significantly higher memory usage: Stores full row data for each changed key
  • Higher CPU usage: buffering changes in the spirit daemon adds CPU load to Spirit
  • More complex: Requires coordination with the applier layer
  • Not the default: Less battle-tested than delta map for single-server schema changes

When to use: Primarily for move operations where REPLACE INTO ... SELECT cannot be used because the source and target are on different MySQL servers.

Enabling: Provide a non-nil Applier in the client config. When an applier is configured, buffered map behavior is enabled automatically, and this is set up by default for move operations.

Features

Watermark Optimization

The watermark optimization is a critical performance feature that prevents the replication client from doing redundant work during the copy phase.

The Problem: During the initial copy phase, the copier is reading rows from the source table and writing them to the new table. Meanwhile, the replication client is also receiving binlog events for those same rows. Without optimization, we would:

  1. Copy row with id=1000 from source to target
  2. Receive a binlog event for id=1000 (from before the copy)
  3. Apply the binlog change, overwriting what we just copied
  4. Result: Wasted work and potential deadlocks

The Solution: The copier maintains a "watermark" representing its progress. The replication client uses this watermark to filter changes:

  • High watermark: Skip changes for rows that haven't been copied yet (they'll be picked up by the copier)
  • Low watermark: Skip changes for rows that are currently being copied (avoid races with the copier, which may cause deadlocks/lock waits)
if chunker.KeyAboveHighWatermark(key[0]) {
    return  // Skip, copier will handle this
}

if !chunker.KeyBelowLowWatermark(key[0]) {
    continue  // Skip, copier is actively working on this range
}

Important: The watermark optimization is disabled before the final cutover to ensure all changes are applied regardless of the copier's position.

Checkpointing

The replication client tracks two positions:

  • Buffered position: All events have been read from the server and stored in memory
  • Flushed position: All events have been successfully applied to the target table
// Get the safe checkpoint position
pos := client.GetBinlogApplyPosition()

// Resume from a checkpoint
client.SetFlushedPos(savedPosition)
err := client.Run(ctx)

Periodically, changes are flushed to advance the flushed position, which is then used as part of checkpoints. Because all replication changes are idempotent, it is understood that on recovery some changes will effectively be re-flushed, and the last ~1 minute of progress may have been lost.

Final Cutover coordination

Before a cutover operation can run, it's important to ensure that there are no unapplied replication changes. The best practice way to do this is to first Flush(ctx) without a lock, and then repeat the flush with the lock held. i.e.

// Ensure most changes are up to date before we need to do this again
// with a lock held (ensures lock duration is as short as possible)
err = client.Flush(ctx)

// Acquire table lock
lock, err := dbconn.LockTable(ctx, db, sourceTable)

// Flush all remaining changes under the lock
err = client.FlushUnderTableLock(ctx, lock)

// This check should be redundant, but we verify everything is applied
if !client.AllChangesFlushed() {
    return errors.New("changes still pending")
}

// Safe to cutover now

The client.Flush() will retry in a loop until the number of pending changes is considered trivial (currently <10K). It is important to handle errors correctly here, because FlushUnderTableLock may fail if it can't flush the pending changes fast enough. This is your cue to abandon the cutover operation for now, and try again when the server is under less load.

Other Minor Features
  • Automatic recovery: Handles transient errors and reconnects to the binlog stream without data loss
  • DDL detection: Monitors for schema changes and notifies the migration coordinator. This is used to abandon any schema changes if the table was externally modified.

See Also

Documentation

Overview

Package repl contains binary log subscription functionality.

Index

Constants

View Source
const (

	// DefaultBatchSize is the number of rows in each batched REPLACE/DELETE statement.
	// Larger is better, but we need to keep the run-time of the statement well below
	// dbconn.maximumLockTime so that it doesn't prevent copy-row tasks from failing.
	// Since on some of our Aurora tables with out-of-cache workloads only copy ~300 rows per second,
	// we probably shouldn't set this any larger than about 1K. It will also use
	// multiple-flush-threads, which should help it group commit and still be fast.
	// This is only used as an initial starting value. It will auto-scale based on the DefaultTargetBatchTime.
	DefaultBatchSize = 1000

	// DefaultTargetBatchTime is the target time for flushing REPLACE/DELETE statements.
	DefaultTargetBatchTime = time.Millisecond * 500

	// DefaultFlushInterval is the time that the client will flush all binlog changes to disk.
	// Longer values require more memory, but permit more merging.
	// I expect we will change this to 1hr-24hr in the future.
	DefaultFlushInterval = 30 * time.Second
	// DefaultTimeout is how long BlockWait is supposed to wait before returning errors.
	DefaultTimeout = 30 * time.Second
)

Variables

This section is empty.

Functions

func DecodeSchemaTable added in v0.10.2

func DecodeSchemaTable(encoded string) (string, string)

func EncodeSchemaTable

func EncodeSchemaTable(schema, table string) string

func NewServerID

func NewServerID() uint32

NewServerID generates a unique server ID to avoid conflicts with other binlog readers. Uses crypto/rand combined with an atomic counter to ensure uniqueness even when called concurrently. Returns a value in the range 1001-4294967295 to avoid conflicts with typical MySQL server IDs (0-1000).

Types

type Client

type Client struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewClient

func NewClient(db *sql.DB, host string, username, password string, config *ClientConfig) *Client

NewClient creates a new Client instance.

func (*Client) AddSubscription

func (c *Client) AddSubscription(currentTable, newTable *table.TableInfo, chunker table.Chunker) error

AddSubscription adds a new subscription. Returns an error if a subscription already exists for the given table.

func (*Client) AllChangesFlushed

func (c *Client) AllChangesFlushed() bool

func (*Client) BlockWait

func (c *Client) BlockWait(ctx context.Context) error

BlockWait blocks until all changes are *buffered*. i.e. the server's current position is 1234, but our buffered position is only 100. We need to read all the events until we reach >= 1234. We do not need to guarantee that they are flushed though, so you need to call Flush() to do that. This call times out! The default timeout is 10 seconds, after which an error will be returned.

func (*Client) Close

func (c *Client) Close()

func (*Client) Flush

func (c *Client) Flush(ctx context.Context) error

Flush empties the changeset in a loop until the amount of changes is considered "trivial". The loop is required, because changes continue to be added while the flush is occurring.

func (*Client) FlushUnderTableLock

func (c *Client) FlushUnderTableLock(ctx context.Context, lock *dbconn.TableLock) error

FlushUnderTableLock is a final flush under an exclusive table lock using the connection that holds a write lock. Because flushing generates binary log events, we actually want to call flush *twice*:

  • The first time flushes the pending changes to the new table.
  • We then ensure that we have all the binary log changes read from the server.
  • The second time reads through the changes generated by the first flush and updates the in memory applied position to match the server's position. This is required to satisfy the binlog position is updated for the c.AllChangesFlushed() check.

func (*Client) GetBinlogApplyPosition

func (c *Client) GetBinlogApplyPosition() mysql.Position

func (*Client) GetDeltaLen

func (c *Client) GetDeltaLen() int

GetDeltaLen returns the total number of changes that are pending across all subscriptions. Acquires the client lock for thread safety.

func (*Client) Run

func (c *Client) Run(ctx context.Context) (err error)

Run initializes the binlog syncer and starts the binlog reader. It returns an error if the initialization fails.

func (*Client) SetDDLNotificationChannel

func (c *Client) SetDDLNotificationChannel(ch chan string)

func (*Client) SetFlushedPos

func (c *Client) SetFlushedPos(pos mysql.Position)

SetFlushedPos updates the known safe position that all changes have been flushed. It is used for resuming from a checkpoint.

func (*Client) SetWatermarkOptimization added in v0.10.1

func (c *Client) SetWatermarkOptimization(newVal bool)

SetWatermarkOptimization sets both high and low watermark optimizations for all subscriptions. This should be disabled before checksum/cutover to ensure all changes are flushed regardless of watermark position.

func (*Client) StartPeriodicFlush

func (c *Client) StartPeriodicFlush(ctx context.Context, interval time.Duration)

StartPeriodicFlush starts a loop that periodically flushes the binlog changeset. This is used by the migrator to ensure the binlog position is advanced.

func (*Client) StopPeriodicFlush

func (c *Client) StopPeriodicFlush()

StopPeriodicFlush disables the periodic flush, also guaranteeing when it returns there is no current flush running

type ClientConfig

type ClientConfig struct {
	TargetBatchTime       time.Duration
	Concurrency           int
	Logger                *slog.Logger
	OnDDL                 chan string
	OnDDLDisableFiltering bool
	ServerID              uint32
	Applier               applier.Applier
	DBConfig              *dbconn.DBConfig // Database configuration including TLS settings
}

func NewClientDefaultConfig

func NewClientDefaultConfig() *ClientConfig

NewClientDefaultConfig returns a default config for the copier.

type Subscription

type Subscription interface {
	HasChanged(key, row []any, deleted bool)
	Length() int
	Flush(ctx context.Context, underLock bool, lock *dbconn.TableLock) (allChangesFlushed bool, err error)
	Tables() []*table.TableInfo            // returns the tables related to the subscription in currentTable, newTable order
	SetWatermarkOptimization(enabled bool) // Controls both high and low watermark optimizations
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL