Documentation
¶
Index ¶
- type Applier
- type ApplierConfig
- type ApplyCallback
- type LogicalRow
- type ShardedApplier
- func (a *ShardedApplier) Apply(ctx context.Context, chunk *table.Chunk, rows [][]any, callback ApplyCallback) error
- func (a *ShardedApplier) DeleteKeys(ctx context.Context, sourceTable, _ *table.TableInfo, keys []string, ...) (int64, error)
- func (a *ShardedApplier) GetTargets() []Target
- func (a *ShardedApplier) Start(ctx context.Context) error
- func (a *ShardedApplier) Stop() error
- func (a *ShardedApplier) UpsertRows(ctx context.Context, mapping *table.ColumnMapping, rows []LogicalRow, ...) (int64, error)
- func (a *ShardedApplier) Wait(ctx context.Context) error
- type SingleTargetApplier
- func (a *SingleTargetApplier) Apply(ctx context.Context, chunk *table.Chunk, rows [][]any, callback ApplyCallback) error
- func (a *SingleTargetApplier) DeleteKeys(ctx context.Context, sourceTable, targetTable *table.TableInfo, keys []string, ...) (int64, error)
- func (a *SingleTargetApplier) GetTargets() []Target
- func (a *SingleTargetApplier) Start(ctx context.Context) error
- func (a *SingleTargetApplier) Stop() error
- func (a *SingleTargetApplier) UpsertRows(ctx context.Context, mapping *table.ColumnMapping, rows []LogicalRow, ...) (int64, error)
- func (a *SingleTargetApplier) Wait(ctx context.Context) error
- type Target
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Applier ¶
type Applier interface {
// Start initializes the applier and starts its workers
Start(ctx context.Context) error
// Apply sends rows to be written to the target(s).
// The chunk parameter provides metadata about the source table and target table.
// The rows parameter contains the actual row data to be written.
// The callback is invoked when all rows are safely flushed.
//
// For the copier: callback will call chunker.Feedback()
// For the subscription: callback will update binlog coordinates
Apply(ctx context.Context, chunk *table.Chunk, rows [][]any, callback ApplyCallback) error
// DeleteKeys deletes rows by their key values synchronously.
// The keys are hashed key strings (from utils.HashKey).
// If lock is non-nil, the delete is executed under the table lock.
// Returns the number of rows affected and any error.
DeleteKeys(ctx context.Context, sourceTable, targetTable *table.TableInfo, keys []string, lock *dbconn.TableLock) (int64, error)
// UpsertRows performs an upsert (INSERT ... ON DUPLICATE KEY UPDATE) synchronously.
// The rows are LogicalRow structs containing the row images.
// If lock is non-nil, the upsert is executed under the table lock.
// Returns the number of rows affected and any error.
UpsertRows(ctx context.Context, mapping *table.ColumnMapping, rows []LogicalRow, lock *dbconn.TableLock) (int64, error)
// Wait blocks until all pending work is complete and all callbacks have been invoked
Wait(ctx context.Context) error
// Stops the applier workers
Stop() error
// GetTargets returns target information for direct database access.
// This is used by operations like checksum that need to query targets directly.
// For SingleTargetApplier, this returns a single target.
// For ShardedApplier, this returns all shards.
GetTargets() []Target
}
Applier is an interface for applying rows to one or more target databases. Implementations can apply to a single target (SingleTargetApplier) or fan out to multiple targets based on a hash function (ShardedApplier).
The Applier is responsible for: - Batching/splitting rows into optimal write sizes - Tracking pending writes - Invoking callbacks when writes are complete
func NewSingleTargetForTest ¶ added in v0.13.0
NewSingleTargetForTest builds a SingleTargetApplier suitable for use as the repl client's applier. The repl client requires a non-nil applier — every memory-comparable PK routes through bufferedMap, which calls Applier.UpsertRows / DeleteKeys. See issue #746.
type ApplierConfig ¶
type ApplierConfig struct {
Threads int // number of write threads
ChunkletMaxRows int
ChunkletMaxSize int
Logger *slog.Logger
DBConfig *dbconn.DBConfig
}
func NewApplierDefaultConfig ¶
func NewApplierDefaultConfig() *ApplierConfig
NewApplierDefaultConfig returns a default config for the applier.
func (*ApplierConfig) Validate ¶
func (cfg *ApplierConfig) Validate() error
Validate checks the ApplierConfig for required fields.
type ApplyCallback ¶
ApplyCallback is invoked when rows have been safely flushed to the target(s). affectedRows is the total number of rows affected across all targets. err is non-nil if there was an error applying the rows.
type LogicalRow ¶
LogicalRow represents the current state of a row in the subscription buffer. This could be that it is deleted, or that it has RowImage that describes it. If there is a RowImage, then it needs to be converted into the RowImage of the newTable.
type ShardedApplier ¶
ShardedApplier applies rows to multiple target databases based on a Vitess-style vindex. It extracts a specific column value from each row, applies a hash function to it, and routes the row to the appropriate shard based on the hash value and key ranges.
The sharding column and hash function are configured per-table in the TableInfo.ShardingColumn and TableInfo.HashFunc fields. This allows different tables to use different sharding keys in multi-table migrations.
func NewShardedApplier ¶
func NewShardedApplier(targets []Target, cfg *ApplierConfig) (*ShardedApplier, error)
NewShardedApplier creates a new ShardedApplier with multiple target databases.
The sharding column and hash function are configured per-table in the TableInfo.ShardingColumn and TableInfo.HashFunc fields. This allows different tables to use different sharding keys in multi-table migrations.
func (*ShardedApplier) Apply ¶
func (a *ShardedApplier) Apply(ctx context.Context, chunk *table.Chunk, rows [][]any, callback ApplyCallback) error
Apply sends rows to be written to the appropriate target shards. Rows are distributed across shards based on the sharding column and hash function configured in the chunk's Table.ShardingColumn and Table.HashFunc.
func (*ShardedApplier) DeleteKeys ¶
func (a *ShardedApplier) DeleteKeys(ctx context.Context, sourceTable, _ *table.TableInfo, keys []string, lock *dbconn.TableLock) (int64, error)
DeleteKeys deletes rows by their key values synchronously, broadcasting to all shards. The keys are hashed key strings (from utils.HashKey). If lock is non-nil, operations are executed under table locks (one per shard).
Note: we only track modifications by PRIMARY KEY, not by shard key (aka primary vindex). For this reason we can't extract the vindex value, and must instead broadcast the deletes to all shards. The vindex value is considered immutable, and we will error if it changes on an update.
Note: the sharded applier does not allow any transformations! The targetTable argument is intentionally ignored. This also means that table names between source and target must be the same.
func (*ShardedApplier) GetTargets ¶
func (a *ShardedApplier) GetTargets() []Target
GetTargets returns the target database configurations for direct access. This is used by operations like checksum that need to query targets directly.
func (*ShardedApplier) Start ¶
func (a *ShardedApplier) Start(ctx context.Context) error
Start initializes all shard workers and begins processing. This method is idempotent and can restart the applier after Stop() is called.
Lifecycle: callers MUST call Stop() to terminate the per-shard write workers and the single feedbackCoordinator. Cancelling the ctx passed here does NOT by itself shut down the goroutine pipeline — it only aborts in-flight writes. Workers for a shard exit when its chunkletBuffer is closed (by Stop), and the coordinator exits when every shard's chunkletCompletions has been closed (by the last worker's defer per shard). Failing to call Stop() will leak goroutines.
func (*ShardedApplier) Stop ¶
func (a *ShardedApplier) Stop() error
Stop signals the applier to shut down gracefully
func (*ShardedApplier) UpsertRows ¶
func (a *ShardedApplier) UpsertRows(ctx context.Context, mapping *table.ColumnMapping, rows []LogicalRow, lock *dbconn.TableLock) (int64, error)
UpsertRows performs upserts synchronously, distributing across shards. The rows are LogicalRow structs containing inline row images from the binlog. Each shard issues `REPLACE INTO target (cols) VALUES (...)`; the REPLACE semantics — and their eventual-consistency implications for callers — are documented on SingleTargetApplier.UpsertRows. The short version: REPLACE may delete rows whose PKs are not in the `rows` argument (via the unique-key conflict resolution) and those rows are re-inserted by their own events in subsequent batches.
If lock is non-nil, operations are executed under table locks (one per shard).
Note: we only track modifications by PRIMARY KEY, not be shard key (aka primary vindex). For this reason we could get in trouble if there was a PK update that mutated the vindex column. This is because we would only see the last operation (modification) and not know to DELETE from one of the shards.
The way we address this, is we consider the vindex column immutable. The replication client is told that it should error if there are any updates to it, and the entire operation is canceled.
This is likely not too big of a limitation, as Vitess itself recommends that vindex columns be immutable. If it turns out to be a problem, we can revisit tracking by other columns later.
Note: the sharded applier does not allow any transformations! The targetTable argument is intentionally ignored. This also means that table names between source and target must be the same.
type SingleTargetApplier ¶
SingleTargetApplier applies rows to a single target database. It internally splits rows into chunklets for optimal batching and tracks completion to invoke callbacks when all chunklets for a set of rows are done.
func NewSingleTargetApplier ¶
func NewSingleTargetApplier(target Target, cfg *ApplierConfig) (*SingleTargetApplier, error)
NewSingleTargetApplier creates a new SingleTargetApplier
func (*SingleTargetApplier) Apply ¶
func (a *SingleTargetApplier) Apply(ctx context.Context, chunk *table.Chunk, rows [][]any, callback ApplyCallback) error
Apply sends rows to be written to the target database
func (*SingleTargetApplier) DeleteKeys ¶
func (a *SingleTargetApplier) DeleteKeys(ctx context.Context, sourceTable, targetTable *table.TableInfo, keys []string, lock *dbconn.TableLock) (int64, error)
DeleteKeys deletes rows by their key values synchronously. The keys are hashed key strings (from utils.HashKey). If lock is non-nil, the delete is executed under the table lock.
func (*SingleTargetApplier) GetTargets ¶
func (a *SingleTargetApplier) GetTargets() []Target
GetTargets returns the target database configuration for direct access. This is used by operations like checksum that need to query targets directly.
func (*SingleTargetApplier) Start ¶
func (a *SingleTargetApplier) Start(ctx context.Context) error
Start initializes the applier's async write workers and begins processing. This does not control the synchronous methods like UpsertRows/DeleteKeys. This method is idempotent - calling it multiple times is safe.
Lifecycle: callers MUST call Stop() to terminate the write workers and feedbackCoordinator. Cancelling the ctx passed here does NOT by itself shut down the goroutine pipeline — it only aborts in-flight writes. Workers exit when chunkletBuffer is closed (by Stop), and the coordinator exits when chunkletCompletions is closed (by the last worker's defer). Failing to call Stop() will leak goroutines.
func (*SingleTargetApplier) Stop ¶
func (a *SingleTargetApplier) Stop() error
Stop signals the applier to shut down gracefully This does not control the synchronous methods like UpsertRows/DeleteKeys, which can continue after Stop() is called. This method is idempotent - calling it multiple times is safe.
func (*SingleTargetApplier) UpsertRows ¶
func (a *SingleTargetApplier) UpsertRows(ctx context.Context, mapping *table.ColumnMapping, rows []LogicalRow, lock *dbconn.TableLock) (int64, error)
UpsertRows performs an upsert (REPLACE INTO ... VALUES) synchronously. The rows are LogicalRow structs containing inline row images from the binlog. If lock is non-nil, the upsert is executed under the table lock.
REPLACE semantics, and why we use them:
MySQL's `REPLACE INTO target (cols) VALUES (...)` treats each value tuple as an INSERT, except that for any row in `target` that conflicts with the new row on PRIMARY KEY *or any UNIQUE index*, the old row is deleted before the new row is inserted. Per the docs, conflicts on multiple unique indexes can lead to multiple deletions for a single new row.
Two implications matter for callers reading this code:
A single REPLACE may delete rows whose PKs are *not* in the `rows` argument. If row B's image collides on a unique key with some other row A currently in the destination (because A was the previous holder of that unique value), REPLACE deletes A while inserting B. A is then transiently missing from the destination until its own event arrives in a later flush (or a later batch in the same flush) and re-inserts it. This is what restores the order-independence the pre-#821 deltaMap had with `REPLACE INTO ... SELECT`. See block/spirit#847.
Eventual consistency. Between the moment REPLACE deletes A and the moment A's image is re-applied, the destination is not a valid snapshot of source — it has fewer rows. Spirit relies on the bufferedMap being an *up-to-date and disjoint* representation of pending changes (each PK appears at most once, holding the latest row image) so that every transiently-deleted row will be re-inserted as flushes progress. The destination converges back to source's current state once the last unflushed event for each affected PK has been applied. The post-cutover checksum (with `FixDifferences=true`) is the backstop that catches any divergence that survives.
We supply inline row images rather than `REPLACE INTO ... SELECT FROM source`, so the read-after-commit race that motivated #746 does not apply.
type Target ¶
type Target struct {
DB *sql.DB
Config *mysql.Config
KeyRange string // Vitess-style key range: "-80", "80-", "80-c0", or "0" for unsharded
}
Target represents a shard target with its database connection, configuration, and key range. Key ranges are expressed as Vitess-style strings (e.g., "-80", "80-", "80-c0"). An empty string or "0" means all key space (unsharded).