applier

package
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 20, 2026 License: Apache-2.0 Imports: 18 Imported by: 0

README

Appliers

Appliers are responsible for writing rows to one or more target(s) and are utilized by the copier/subscription components.

  • SingleTargetApplier: For standard (non-sharded) migrations to a single target database.
  • ShardedApplier: For migrations to Vitess-style sharded databases, where rows are distributed across multiple targets based on a hash function.

Both implementations share a common interface but have different internal architectures to handle their respective use cases efficiently.

History

The original implementation of Spirit relied on statements such as INSERT .. SELECT and REPLACE INTO, sending as much work as possible back to MySQL for processing. We now refer to this implementation as the unbuffered algorithm.

The unbuffered algorithm has the advantage that there are fewer edge cases to handle that can corrupt data (accidental, charset/timezone conversions), and it does not send as much data across the network (which takes CPU cycles from both MySQL and Spirit to process). It has two downsides:

  1. INSERT .. SELECT statements are locking, and do not use MVCC on the SELECT side.
  2. It cannot be used to ship data between MySQL servers, for example in move/copy operations.

The first downside can be mitigated by using smaller chunks to yield the lock periodically, but there is no option to address the second.

Appliers were created to support an algorithm which we refer to as buffered, which is an implementation of DBLog. Changes are extracted from the source table(s), and then sent to the applier to be loaded into an underlying target.

The unbuffered copier remains the default for schema changes — opting into the buffered copier still requires --buffered. The applier itself, however, is always used by the replication client's bufferedMap subscription, which writes row images directly from the binlog instead of issuing REPLACE INTO ... SELECT (see issue #746). For move operations only the buffered copier is supported.

Why an Applier Abstraction?

Applier is an abstraction which encompasses all changes that can be applied to a target. The advantage of having an interface for this is:

  1. Complex Scenarios: Abstract away resharding operations and other complex topologies.
  2. Future Targets: Support non-MySQL targets or targets with different performance characteristics.

We do not intend for Spirit to support schema changes on anything other than MySQL, but it could in future be possible to use it to synchronize data between MySQL and a downstream such as PostgreSQL or Iceberg.

The applier layer provides several critical functions:

  1. Optimal Batching: Rows are split into "chunklets" that respect both MySQL's max_allowed_packet limit and optimal write sizes
  2. Parallel Processing: Multiple write workers fan-out and process chunklets concurrently for ideal use of group commit.
  3. Async Feedback: Callers are notified via callbacks when writes complete, allowing the copier to advance its watermark.
  4. Mixed Operations: Supports both async bulk copying (Apply) and synchronous operations (DeleteKeys, UpsertRows) needed by the subscription.

Without the applier layer, the copier would need to handle all of this complexity itself, making the code harder to maintain and test. The copier is agnostic to sharded migrations.

Core Concepts

Chunklets

A "chunklet" is an internal batching unit used by appliers. This is different from the "chunk" concept in pkg/table/, which refers to the range of rows the copier reads from the source table.

When the copier calls Apply() with a batch of rows (typically from one chunk), the applier splits those rows into smaller "chunklets" for writing. Each chunklet defaults to:

  • Row count: Maximum 1,000 rows per chunklet
  • Size: Maximum 1 MiB of estimated data per chunklet

The size limit exists because MySQL's max_allowed_packet is typically 64 MiB by default, but we use a conservative 1 MiB threshold to ensure we never approach that limit even with very wide rows. The row count limit provides a reasonable upper bound for tables with narrow rows.

Important: A single row can exceed 1 MiB by itself. In this edge case, the row will be placed in its own chunklet regardless of size, relying on max_allowed_packet being large enough. This is rare in practice.

Async vs Sync Operations

The applier interface provides both asynchronous and synchronous methods:

Asynchronous (used by copier):

  • Apply(ctx, chunk, rows, callback): Queues rows for writing and returns immediately. The callback is invoked when all rows have been written.

Synchronous (used by subscription):

  • DeleteKeys(ctx, sourceTable, targetTable, keys, lock): Deletes rows by primary key and waits for completion. Emits DELETE FROM target WHERE (pk) IN (...).
  • UpsertRows(ctx, mapping, rows, lock): Upserts rows using a ColumnMapping and waits for completion. Emits REPLACE INTO target (cols) VALUES (...) — see REPLACE INTO semantics below.

This distinction exists because:

  • The copier processes large batches of rows and benefits from async processing with callbacks to advance its watermark.
  • The subscription processes individual binlog events and needs immediate confirmation that changes have been applied before advancing the binlog position.
REPLACE INTO semantics and eventual consistency

UpsertRows uses REPLACE INTO target (cols) VALUES (...), not INSERT ... ON DUPLICATE KEY UPDATE. Per MySQL's manual:

REPLACE works exactly like INSERT, except that if an old row in the table has the same value as a new row for a PRIMARY KEY or a UNIQUE index, the old row is deleted before the new row is inserted.

Two consequences for callers:

  1. REPLACE may delete rows whose PKs are not in the rows argument. If a new row's image collides on a unique key with some other row currently in the destination (the previous holder of that unique value), REPLACE deletes that other row to make room. A single REPLACE statement may therefore delete more than one row. This is what makes the multi-row VALUES list order-independent — within a batch, every row's conflicts (on PK or any unique index) are resolved before its insert runs.

  2. The destination is only eventually consistent with source mid-flush. Between the moment REPLACE deletes a row to resolve a unique-key conflict and the moment that row's own event re-inserts it, the destination is briefly missing that row. Spirit relies on the replication client's bufferedMap being an up-to-date and disjoint representation of pending changes — every PK in the buffer holds the latest image MySQL has emitted for it — so any transiently-deleted row is guaranteed to be re-inserted as flushes progress. The destination converges back to source's current state once every event for each affected PK has been applied. The post-cutover checksum (with FixDifferences=true) is the backstop for any divergence that survives.

The row image is supplied inline (the binlog reader stored it on HasChanged); the applier never re-reads source. This avoids the binlog/visibility race fixed in #746 that earlier REPLACE INTO ... SELECT paths could lose to.

Why this matters for workloads that move unique values

The motivating case is a source-side transaction that legally moves a unique value between two rows:

START TRANSACTION;
UPDATE t SET slot_id = NULL WHERE id = 1;  -- was 'S'
UPDATE t SET slot_id = 'S'  WHERE id = 2;  -- was NULL
COMMIT;

With INSERT ... ON DUPLICATE KEY UPDATE, the random map iteration order in the subscription could land "activate id=2" before "deactivate id=1" in the same multi-row statement; MySQL would resolve id=2's UPDATE branch, then fail with Error 1062 (23000): Duplicate entry 'S' because id=1 still held the value. With REPLACE INTO the same batch in any order works: each REPLACE deletes the prior holder of 'S' before inserting its own row. See block/spirit#847.

Callbacks and Feedback

When the copier calls Apply(), it provides a callback function:

callback := func(affectedRows int64, err error) {
    if err != nil {
        // Handle error
        return
    }
    // All rows have been written, advance watermark
    chunker.Feedback(affectedRows)
}
applier.Apply(ctx, chunk, rows, callback)

The applier tracks all pending work internally and invokes the callback only when:

  1. All chunklets for that batch have been written.
  2. OR an error occurs in any chunklet.

This allows the copier to continue reading and queuing more work without blocking, while still maintaining correctness by only advancing the watermark after writes complete.

Implementation Details

For detailed information about the SingleTargetApplier and ShardedApplier implementations, see the inline code documentation in single_target.go and sharded.go.

The ShardedApplier has an important limitation: it only tracks changes by PRIMARY KEY, not by sharding column (vindex). This means DeleteKeys and UpsertRows must broadcast to all shards, and the vindex column must be immutable. See sharded.go for details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Applier

type Applier interface {
	// Start initializes the applier and starts its workers
	Start(ctx context.Context) error

	// Apply sends rows to be written to the target(s).
	// The chunk parameter provides metadata about the source table and target table.
	// The rows parameter contains the actual row data to be written.
	// The callback is invoked when all rows are safely flushed.
	//
	// For the copier: callback will call chunker.Feedback()
	// For the subscription: callback will update binlog coordinates
	Apply(ctx context.Context, chunk *table.Chunk, rows [][]any, callback ApplyCallback) error

	// DeleteKeys deletes rows by their key values synchronously.
	// The keys are hashed key strings (from utils.HashKey).
	// If lock is non-nil, the delete is executed under the table lock.
	// Returns the number of rows affected and any error.
	DeleteKeys(ctx context.Context, sourceTable, targetTable *table.TableInfo, keys []string, lock *dbconn.TableLock) (int64, error)

	// UpsertRows performs an upsert (INSERT ... ON DUPLICATE KEY UPDATE) synchronously.
	// The rows are LogicalRow structs containing the row images.
	// If lock is non-nil, the upsert is executed under the table lock.
	// Returns the number of rows affected and any error.
	UpsertRows(ctx context.Context, mapping *table.ColumnMapping, rows []LogicalRow, lock *dbconn.TableLock) (int64, error)

	// Wait blocks until all pending work is complete and all callbacks have been invoked
	Wait(ctx context.Context) error

	// Stops the applier workers
	Stop() error

	// GetTargets returns target information for direct database access.
	// This is used by operations like checksum that need to query targets directly.
	// For SingleTargetApplier, this returns a single target.
	// For ShardedApplier, this returns all shards.
	GetTargets() []Target
}

Applier is an interface for applying rows to one or more target databases. Implementations can apply to a single target (SingleTargetApplier) or fan out to multiple targets based on a hash function (ShardedApplier).

The Applier is responsible for: - Batching/splitting rows into optimal write sizes - Tracking pending writes - Invoking callbacks when writes are complete

func NewSingleTargetForTest added in v0.13.0

func NewSingleTargetForTest(t *testing.T, db *sql.DB) Applier

NewSingleTargetForTest builds a SingleTargetApplier suitable for use as the repl client's applier. The repl client requires a non-nil applier — every memory-comparable PK routes through bufferedMap, which calls Applier.UpsertRows / DeleteKeys. See issue #746.

type ApplierConfig

type ApplierConfig struct {
	Threads         int // number of write threads
	ChunkletMaxRows int
	ChunkletMaxSize int
	Logger          *slog.Logger
	DBConfig        *dbconn.DBConfig
}

func NewApplierDefaultConfig

func NewApplierDefaultConfig() *ApplierConfig

NewApplierDefaultConfig returns a default config for the applier.

func (*ApplierConfig) Validate

func (cfg *ApplierConfig) Validate() error

Validate checks the ApplierConfig for required fields.

type ApplyCallback

type ApplyCallback func(affectedRows int64, err error)

ApplyCallback is invoked when rows have been safely flushed to the target(s). affectedRows is the total number of rows affected across all targets. err is non-nil if there was an error applying the rows.

type LogicalRow

type LogicalRow struct {
	IsDeleted bool
	RowImage  []any
}

LogicalRow represents the current state of a row in the subscription buffer. This could be that it is deleted, or that it has RowImage that describes it. If there is a RowImage, then it needs to be converted into the RowImage of the newTable.

type ShardedApplier

type ShardedApplier struct {
	sync.Mutex
	// contains filtered or unexported fields
}

ShardedApplier applies rows to multiple target databases based on a Vitess-style vindex. It extracts a specific column value from each row, applies a hash function to it, and routes the row to the appropriate shard based on the hash value and key ranges.

The sharding column and hash function are configured per-table in the TableInfo.ShardingColumn and TableInfo.HashFunc fields. This allows different tables to use different sharding keys in multi-table migrations.

func NewShardedApplier

func NewShardedApplier(targets []Target, cfg *ApplierConfig) (*ShardedApplier, error)

NewShardedApplier creates a new ShardedApplier with multiple target databases.

The sharding column and hash function are configured per-table in the TableInfo.ShardingColumn and TableInfo.HashFunc fields. This allows different tables to use different sharding keys in multi-table migrations.

func (*ShardedApplier) Apply

func (a *ShardedApplier) Apply(ctx context.Context, chunk *table.Chunk, rows [][]any, callback ApplyCallback) error

Apply sends rows to be written to the appropriate target shards. Rows are distributed across shards based on the sharding column and hash function configured in the chunk's Table.ShardingColumn and Table.HashFunc.

func (*ShardedApplier) DeleteKeys

func (a *ShardedApplier) DeleteKeys(ctx context.Context, sourceTable, _ *table.TableInfo, keys []string, lock *dbconn.TableLock) (int64, error)

DeleteKeys deletes rows by their key values synchronously, broadcasting to all shards. The keys are hashed key strings (from utils.HashKey). If lock is non-nil, operations are executed under table locks (one per shard).

Note: we only track modifications by PRIMARY KEY, not by shard key (aka primary vindex). For this reason we can't extract the vindex value, and must instead broadcast the deletes to all shards. The vindex value is considered immutable, and we will error if it changes on an update.

Note: the sharded applier does not allow any transformations! The targetTable argument is intentionally ignored. This also means that table names between source and target must be the same.

func (*ShardedApplier) GetTargets

func (a *ShardedApplier) GetTargets() []Target

GetTargets returns the target database configurations for direct access. This is used by operations like checksum that need to query targets directly.

func (*ShardedApplier) Start

func (a *ShardedApplier) Start(ctx context.Context) error

Start initializes all shard workers and begins processing. This method is idempotent and can restart the applier after Stop() is called.

Lifecycle: callers MUST call Stop() to terminate the per-shard write workers and the single feedbackCoordinator. Cancelling the ctx passed here does NOT by itself shut down the goroutine pipeline — it only aborts in-flight writes. Workers for a shard exit when its chunkletBuffer is closed (by Stop), and the coordinator exits when every shard's chunkletCompletions has been closed (by the last worker's defer per shard). Failing to call Stop() will leak goroutines.

func (*ShardedApplier) Stop

func (a *ShardedApplier) Stop() error

Stop signals the applier to shut down gracefully

func (*ShardedApplier) UpsertRows

func (a *ShardedApplier) UpsertRows(ctx context.Context, mapping *table.ColumnMapping, rows []LogicalRow, lock *dbconn.TableLock) (int64, error)

UpsertRows performs upserts synchronously, distributing across shards. The rows are LogicalRow structs containing inline row images from the binlog. Each shard issues `REPLACE INTO target (cols) VALUES (...)`; the REPLACE semantics — and their eventual-consistency implications for callers — are documented on SingleTargetApplier.UpsertRows. The short version: REPLACE may delete rows whose PKs are not in the `rows` argument (via the unique-key conflict resolution) and those rows are re-inserted by their own events in subsequent batches.

If lock is non-nil, operations are executed under table locks (one per shard).

Note: we only track modifications by PRIMARY KEY, not be shard key (aka primary vindex). For this reason we could get in trouble if there was a PK update that mutated the vindex column. This is because we would only see the last operation (modification) and not know to DELETE from one of the shards.

The way we address this, is we consider the vindex column immutable. The replication client is told that it should error if there are any updates to it, and the entire operation is canceled.

This is likely not too big of a limitation, as Vitess itself recommends that vindex columns be immutable. If it turns out to be a problem, we can revisit tracking by other columns later.

Note: the sharded applier does not allow any transformations! The targetTable argument is intentionally ignored. This also means that table names between source and target must be the same.

func (*ShardedApplier) Wait

func (a *ShardedApplier) Wait(ctx context.Context) error

Wait blocks until all pending work is complete and all callbacks have been invoked

type SingleTargetApplier

type SingleTargetApplier struct {
	sync.Mutex
	// contains filtered or unexported fields
}

SingleTargetApplier applies rows to a single target database. It internally splits rows into chunklets for optimal batching and tracks completion to invoke callbacks when all chunklets for a set of rows are done.

func NewSingleTargetApplier

func NewSingleTargetApplier(target Target, cfg *ApplierConfig) (*SingleTargetApplier, error)

NewSingleTargetApplier creates a new SingleTargetApplier

func (*SingleTargetApplier) Apply

func (a *SingleTargetApplier) Apply(ctx context.Context, chunk *table.Chunk, rows [][]any, callback ApplyCallback) error

Apply sends rows to be written to the target database

func (*SingleTargetApplier) DeleteKeys

func (a *SingleTargetApplier) DeleteKeys(ctx context.Context, sourceTable, targetTable *table.TableInfo, keys []string, lock *dbconn.TableLock) (int64, error)

DeleteKeys deletes rows by their key values synchronously. The keys are hashed key strings (from utils.HashKey). If lock is non-nil, the delete is executed under the table lock.

func (*SingleTargetApplier) GetTargets

func (a *SingleTargetApplier) GetTargets() []Target

GetTargets returns the target database configuration for direct access. This is used by operations like checksum that need to query targets directly.

func (*SingleTargetApplier) Start

func (a *SingleTargetApplier) Start(ctx context.Context) error

Start initializes the applier's async write workers and begins processing. This does not control the synchronous methods like UpsertRows/DeleteKeys. This method is idempotent - calling it multiple times is safe.

Lifecycle: callers MUST call Stop() to terminate the write workers and feedbackCoordinator. Cancelling the ctx passed here does NOT by itself shut down the goroutine pipeline — it only aborts in-flight writes. Workers exit when chunkletBuffer is closed (by Stop), and the coordinator exits when chunkletCompletions is closed (by the last worker's defer). Failing to call Stop() will leak goroutines.

func (*SingleTargetApplier) Stop

func (a *SingleTargetApplier) Stop() error

Stop signals the applier to shut down gracefully This does not control the synchronous methods like UpsertRows/DeleteKeys, which can continue after Stop() is called. This method is idempotent - calling it multiple times is safe.

func (*SingleTargetApplier) UpsertRows

func (a *SingleTargetApplier) UpsertRows(ctx context.Context, mapping *table.ColumnMapping, rows []LogicalRow, lock *dbconn.TableLock) (int64, error)

UpsertRows performs an upsert (REPLACE INTO ... VALUES) synchronously. The rows are LogicalRow structs containing inline row images from the binlog. If lock is non-nil, the upsert is executed under the table lock.

REPLACE semantics, and why we use them:

MySQL's `REPLACE INTO target (cols) VALUES (...)` treats each value tuple as an INSERT, except that for any row in `target` that conflicts with the new row on PRIMARY KEY *or any UNIQUE index*, the old row is deleted before the new row is inserted. Per the docs, conflicts on multiple unique indexes can lead to multiple deletions for a single new row.

Two implications matter for callers reading this code:

  1. A single REPLACE may delete rows whose PKs are *not* in the `rows` argument. If row B's image collides on a unique key with some other row A currently in the destination (because A was the previous holder of that unique value), REPLACE deletes A while inserting B. A is then transiently missing from the destination until its own event arrives in a later flush (or a later batch in the same flush) and re-inserts it. This is what restores the order-independence the pre-#821 deltaMap had with `REPLACE INTO ... SELECT`. See block/spirit#847.

  2. Eventual consistency. Between the moment REPLACE deletes A and the moment A's image is re-applied, the destination is not a valid snapshot of source — it has fewer rows. Spirit relies on the bufferedMap being an *up-to-date and disjoint* representation of pending changes (each PK appears at most once, holding the latest row image) so that every transiently-deleted row will be re-inserted as flushes progress. The destination converges back to source's current state once the last unflushed event for each affected PK has been applied. The post-cutover checksum (with `FixDifferences=true`) is the backstop that catches any divergence that survives.

We supply inline row images rather than `REPLACE INTO ... SELECT FROM source`, so the read-after-commit race that motivated #746 does not apply.

func (*SingleTargetApplier) Wait

Wait blocks until all pending work is complete and all callbacks have been invoked

type Target

type Target struct {
	DB       *sql.DB
	Config   *mysql.Config
	KeyRange string // Vitess-style key range: "-80", "80-", "80-c0", or "0" for unsharded
}

Target represents a shard target with its database connection, configuration, and key range. Key ranges are expressed as Vitess-style strings (e.g., "-80", "80-", "80-c0"). An empty string or "0" means all key space (unsharded).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL