applier

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2026 License: Apache-2.0 Imports: 16 Imported by: 0

README

Appliers

Appliers are responsible for writing rows to one or more target(s) and are utilized by the copier/subscription components.

  • SingleTargetApplier: For standard (non-sharded) migrations to a single target database.
  • ShardedApplier: For migrations to Vitess-style sharded databases, where rows are distributed across multiple targets based on a hash function.

Both implementations share a common interface but have different internal architectures to handle their respective use cases efficiently.

History

The original implementation of Spirit relied on statements such as INSERT .. SELECT and REPLACE INTO, sending as much work as possible back to MySQL for processing. We now refer to this implementation as the unbuffered algorithm.

The unbuffered algorithm has the advantage that there are fewer edge cases to handle that can corrupt data (accidental, charset/timezone conversions), and it does not send as much data across the network (which takes CPU cycles from both MySQL and Spirit to process). It has two downsides:

  1. INSERT .. SELECT statements are locking, and do not use MVCC on the SELECT side.
  2. It cannot be used to ship data between MySQL servers, for example in move/copy operations.

The first downside can be mitigated by using smaller chunks to yield the lock periodically, but there is no option to address the second.

Appliers were created to support an algorithm which we refer to as buffered, which is an implementation of DBLog. Changes are extracted from the source table(s), and then sent to the applier to be loaded into an underlying target.

The unbuffered algorithm remains the default for schema changes, and we have no current plans to change it. For move operations only the buffered algorithm is supported.

Why an Applier Abstraction?

Applier is an abstraction which encompasses all changes that can be applied to a target. The advantage of having an interface for this is:

  1. Complex Scenarios: Abstract away resharding operations and other complex topologies.
  2. Future Targets: Support non-MySQL targets or targets with different performance characteristics.

We do not intend for Spirit to support schema changes on anything other than MySQL, but it could in future be possible to use it to synchronize data between MySQL and a downstream such as PostgreSQL or Iceberg.

The applier layer provides several critical functions:

  1. Optimal Batching: Rows are split into "chunklets" that respect both MySQL's max_allowed_packet limit and optimal write sizes
  2. Parallel Processing: Multiple write workers fan-out and process chunklets concurrently for ideal use of group commit.
  3. Async Feedback: Callers are notified via callbacks when writes complete, allowing the copier to advance its watermark.
  4. Mixed Operations: Supports both async bulk copying (Apply) and synchronous operations (DeleteKeys, UpsertRows) needed by the subscription.

Without the applier layer, the copier would need to handle all of this complexity itself, making the code harder to maintain and test. The copier is agnostic to sharded migrations.

Core Concepts

Chunklets

A "chunklet" is an internal batching unit used by appliers. This is different from the "chunk" concept in pkg/table/, which refers to the range of rows the copier reads from the source table.

When the copier calls Apply() with a batch of rows (typically from one chunk), the applier splits those rows into smaller "chunklets" for writing. Each chunklet defaults to:

  • Row count: Maximum 1,000 rows per chunklet
  • Size: Maximum 1 MiB of estimated data per chunklet

The size limit exists because MySQL's max_allowed_packet is typically 64 MiB by default, but we use a conservative 1 MiB threshold to ensure we never approach that limit even with very wide rows. The row count limit provides a reasonable upper bound for tables with narrow rows.

Important: A single row can exceed 1 MiB by itself. In this edge case, the row will be placed in its own chunklet regardless of size, relying on max_allowed_packet being large enough. This is rare in practice.

Async vs Sync Operations

The applier interface provides both asynchronous and synchronous methods:

Asynchronous (used by copier):

  • Apply(ctx, chunk, rows, callback): Queues rows for writing and returns immediately. The callback is invoked when all rows have been written.

Synchronous (used by subscription):

  • DeleteKeys(ctx, sourceTable, targetTable, keys, lock): Deletes rows by primary key and waits for completion.
  • UpsertRows(ctx, sourceTable, targetTable, rows, lock): Upserts rows and waits for completion.

This distinction exists because:

  • The copier processes large batches of rows and benefits from async processing with callbacks to advance its watermark.
  • The subscription processes individual binlog events and needs immediate confirmation that changes have been applied before advancing the binlog position.
Callbacks and Feedback

When the copier calls Apply(), it provides a callback function:

callback := func(affectedRows int64, err error) {
    if err != nil {
        // Handle error
        return
    }
    // All rows have been written, advance watermark
    chunker.Feedback(affectedRows)
}
applier.Apply(ctx, chunk, rows, callback)

The applier tracks all pending work internally and invokes the callback only when:

  1. All chunklets for that batch have been written.
  2. OR an error occurs in any chunklet.

This allows the copier to continue reading and queuing more work without blocking, while still maintaining correctness by only advancing the watermark after writes complete.

Implementation Details

For detailed information about the SingleTargetApplier and ShardedApplier implementations, see the inline code documentation in single_target.go and sharded.go.

The ShardedApplier has an important limitation: it only tracks changes by PRIMARY KEY, not by sharding column (vindex). This means DeleteKeys and UpsertRows must broadcast to all shards, and the vindex column must be immutable. See sharded.go for details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Applier

type Applier interface {
	// Start initializes the applier and starts its workers
	Start(ctx context.Context) error

	// Apply sends rows to be written to the target(s).
	// The chunk parameter provides metadata about the source table and target table.
	// The rows parameter contains the actual row data to be written.
	// The callback is invoked when all rows are safely flushed.
	//
	// For the copier: callback will call chunker.Feedback()
	// For the subscription: callback will update binlog coordinates
	Apply(ctx context.Context, chunk *table.Chunk, rows [][]any, callback ApplyCallback) error

	// DeleteKeys deletes rows by their key values synchronously.
	// The keys are hashed key strings (from utils.HashKey).
	// If lock is non-nil, the delete is executed under the table lock.
	// Returns the number of rows affected and any error.
	DeleteKeys(ctx context.Context, sourceTable, targetTable *table.TableInfo, keys []string, lock *dbconn.TableLock) (int64, error)

	// UpsertRows performs an upsert (INSERT ... ON DUPLICATE KEY UPDATE) synchronously.
	// The rows are LogicalRow structs containing the row images.
	// If lock is non-nil, the upsert is executed under the table lock.
	// Returns the number of rows affected and any error.
	UpsertRows(ctx context.Context, sourceTable, targetTable *table.TableInfo, rows []LogicalRow, lock *dbconn.TableLock) (int64, error)

	// Wait blocks until all pending work is complete and all callbacks have been invoked
	Wait(ctx context.Context) error

	// Stops the applier workers
	Stop() error

	// GetTargets returns target information for direct database access.
	// This is used by operations like checksum that need to query targets directly.
	// For SingleTargetApplier, this returns a single target.
	// For ShardedApplier, this returns all shards.
	GetTargets() []Target
}

Applier is an interface for applying rows to one or more target databases. Implementations can apply to a single target (SingleTargetApplier) or fan out to multiple targets based on a hash function (ShardedApplier).

The Applier is responsible for: - Batching/splitting rows into optimal write sizes - Tracking pending writes - Invoking callbacks when writes are complete

type ApplierConfig

type ApplierConfig struct {
	Threads         int // number of write threads
	ChunkletMaxRows int
	ChunkletMaxSize int
	Logger          *slog.Logger
	DBConfig        *dbconn.DBConfig
}

func NewApplierDefaultConfig

func NewApplierDefaultConfig() *ApplierConfig

NewApplierDefaultConfig returns a default config for the applier.

func (*ApplierConfig) Validate

func (cfg *ApplierConfig) Validate() error

Validate checks the ApplierConfig for required fields.

type ApplyCallback

type ApplyCallback func(affectedRows int64, err error)

ApplyCallback is invoked when rows have been safely flushed to the target(s). affectedRows is the total number of rows affected across all targets. err is non-nil if there was an error applying the rows.

type LogicalRow

type LogicalRow struct {
	IsDeleted bool
	RowImage  []any
}

LogicalRow represents the current state of a row in the subscription buffer. This could be that it is deleted, or that it has RowImage that describes it. If there is a RowImage, then it needs to be converted into the RowImage of the newTable.

type ShardedApplier

type ShardedApplier struct {
	sync.Mutex
	// contains filtered or unexported fields
}

ShardedApplier applies rows to multiple target databases based on a Vitess-style vindex. It extracts a specific column value from each row, applies a hash function to it, and routes the row to the appropriate shard based on the hash value and key ranges.

The sharding column and hash function are configured per-table in the TableInfo.ShardingColumn and TableInfo.HashFunc fields. This allows different tables to use different sharding keys in multi-table migrations.

func NewShardedApplier

func NewShardedApplier(targets []Target, cfg *ApplierConfig) (*ShardedApplier, error)

NewShardedApplier creates a new ShardedApplier with multiple target databases.

The sharding column and hash function are configured per-table in the TableInfo.ShardingColumn and TableInfo.HashFunc fields. This allows different tables to use different sharding keys in multi-table migrations.

func (*ShardedApplier) Apply

func (a *ShardedApplier) Apply(ctx context.Context, chunk *table.Chunk, rows [][]any, callback ApplyCallback) error

Apply sends rows to be written to the appropriate target shards. Rows are distributed across shards based on the sharding column and hash function configured in the chunk's Table.ShardingColumn and Table.HashFunc.

func (*ShardedApplier) DeleteKeys

func (a *ShardedApplier) DeleteKeys(ctx context.Context, sourceTable, _ *table.TableInfo, keys []string, lock *dbconn.TableLock) (int64, error)

DeleteKeys deletes rows by their key values synchronously, broadcasting to all shards. The keys are hashed key strings (from utils.HashKey). If lock is non-nil, operations are executed under table locks (one per shard).

Note: we only track modifications by PRIMARY KEY, not by shard key (aka primary vindex). For this reason we can't extract the vindex value, and must instead broadcast the deletes to all shards. The vindex value is considered immutable, and we will error if it changes on an update.

Note: the sharded applier does not allow any transformations! The targetTable argument is intentionally ignored. This also means that table names between source and target must be the same.

func (*ShardedApplier) GetTargets

func (a *ShardedApplier) GetTargets() []Target

GetTargets returns the target database configurations for direct access. This is used by operations like checksum that need to query targets directly.

func (*ShardedApplier) Start

func (a *ShardedApplier) Start(ctx context.Context) error

Start initializes all shard workers and begins processing This method is idempotent and can restart the applier after Stop() is called.

func (*ShardedApplier) Stop

func (a *ShardedApplier) Stop() error

Stop signals the applier to shut down gracefully

func (*ShardedApplier) UpsertRows

func (a *ShardedApplier) UpsertRows(ctx context.Context, sourceTable, _ *table.TableInfo, rows []LogicalRow, lock *dbconn.TableLock) (int64, error)

UpsertRows performs upserts synchronously, distributing across shards. The rows are LogicalRow structs containing the row images. If lock is non-nil, operations are executed under table locks (one per shard).

Note: we only track modifications by PRIMARY KEY, not be shard key (aka primary vindex). For this reason we could get in trouble if there was a PK update that mutated the vindex column. This is because we would only see the last operation (modification) and not know to DELETE from one of the shards.

The way we address this, is we consider the vindex column immutable. The replication client is told that it should error if there are any updates to it, and the entire operation is canceled.

This is likely not too big of a limitation, as Vitess itself recommends that vindex columns be immutable. If it turns out to be a problem, we can revisit tracking by other columns later.

Note: the sharded applier does not allow any transformations! The targetTable argument is intentionally ignored. This also means that table names between source and target must be the same.

func (*ShardedApplier) Wait

func (a *ShardedApplier) Wait(ctx context.Context) error

Wait blocks until all pending work is complete and all callbacks have been invoked

type SingleTargetApplier

type SingleTargetApplier struct {
	sync.Mutex
	// contains filtered or unexported fields
}

SingleTargetApplier applies rows to a single target database. It internally splits rows into chunklets for optimal batching and tracks completion to invoke callbacks when all chunklets for a set of rows are done.

func NewSingleTargetApplier

func NewSingleTargetApplier(target Target, cfg *ApplierConfig) (*SingleTargetApplier, error)

NewSingleTargetApplier creates a new SingleTargetApplier

func (*SingleTargetApplier) Apply

func (a *SingleTargetApplier) Apply(ctx context.Context, chunk *table.Chunk, rows [][]any, callback ApplyCallback) error

Apply sends rows to be written to the target database

func (*SingleTargetApplier) DeleteKeys

func (a *SingleTargetApplier) DeleteKeys(ctx context.Context, sourceTable, targetTable *table.TableInfo, keys []string, lock *dbconn.TableLock) (int64, error)

DeleteKeys deletes rows by their key values synchronously. The keys are hashed key strings (from utils.HashKey). If lock is non-nil, the delete is executed under the table lock.

func (*SingleTargetApplier) GetTargets

func (a *SingleTargetApplier) GetTargets() []Target

GetTargets returns the target database configuration for direct access. This is used by operations like checksum that need to query targets directly.

func (*SingleTargetApplier) Start

func (a *SingleTargetApplier) Start(ctx context.Context) error

Start initializes the applier's async write workers and begins processing This does not control the synchronous methods like UpsertRows/DeleteKeys This method is idempotent - calling it multiple times is safe.

func (*SingleTargetApplier) Stop

func (a *SingleTargetApplier) Stop() error

Stop signals the applier to shut down gracefully This does not control the synchronous methods like UpsertRows/DeleteKeys, which can continue after Stop() is called. This method is idempotent - calling it multiple times is safe.

func (*SingleTargetApplier) UpsertRows

func (a *SingleTargetApplier) UpsertRows(ctx context.Context, sourceTable, targetTable *table.TableInfo, rows []LogicalRow, lock *dbconn.TableLock) (int64, error)

UpsertRows performs an upsert (INSERT ... ON DUPLICATE KEY UPDATE) synchronously. The rows are LogicalRow structs containing the row images. If lock is non-nil, the upsert is executed under the table lock.

func (*SingleTargetApplier) Wait

Wait blocks until all pending work is complete and all callbacks have been invoked

type Target

type Target struct {
	DB       *sql.DB
	Config   *mysql.Config
	KeyRange string // Vitess-style key range: "-80", "80-", "80-c0", or "0" for unsharded
}

Target represents a shard target with its database connection, configuration, and key range. Key ranges are expressed as Vitess-style strings (e.g., "-80", "80-", "80-c0"). An empty string or "0" means all key space (unsharded).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL