Documentation
¶
Index ¶
- type Applier
- type ApplierConfig
- type ApplyCallback
- type LogicalRow
- type ShardedApplier
- func (a *ShardedApplier) Apply(ctx context.Context, chunk *table.Chunk, rows [][]any, callback ApplyCallback) error
- func (a *ShardedApplier) DeleteKeys(ctx context.Context, sourceTable, _ *table.TableInfo, keys []string, ...) (int64, error)
- func (a *ShardedApplier) GetTargets() []Target
- func (a *ShardedApplier) Start(ctx context.Context) error
- func (a *ShardedApplier) Stop() error
- func (a *ShardedApplier) UpsertRows(ctx context.Context, sourceTable, _ *table.TableInfo, rows []LogicalRow, ...) (int64, error)
- func (a *ShardedApplier) Wait(ctx context.Context) error
- type SingleTargetApplier
- func (a *SingleTargetApplier) Apply(ctx context.Context, chunk *table.Chunk, rows [][]any, callback ApplyCallback) error
- func (a *SingleTargetApplier) DeleteKeys(ctx context.Context, sourceTable, targetTable *table.TableInfo, keys []string, ...) (int64, error)
- func (a *SingleTargetApplier) GetTargets() []Target
- func (a *SingleTargetApplier) Start(ctx context.Context) error
- func (a *SingleTargetApplier) Stop() error
- func (a *SingleTargetApplier) UpsertRows(ctx context.Context, sourceTable, targetTable *table.TableInfo, ...) (int64, error)
- func (a *SingleTargetApplier) Wait(ctx context.Context) error
- type Target
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Applier ¶
type Applier interface {
// Start initializes the applier and starts its workers
Start(ctx context.Context) error
// Apply sends rows to be written to the target(s).
// The chunk parameter provides metadata about the source table and target table.
// The rows parameter contains the actual row data to be written.
// The callback is invoked when all rows are safely flushed.
//
// For the copier: callback will call chunker.Feedback()
// For the subscription: callback will update binlog coordinates
Apply(ctx context.Context, chunk *table.Chunk, rows [][]any, callback ApplyCallback) error
// DeleteKeys deletes rows by their key values synchronously.
// The keys are hashed key strings (from utils.HashKey).
// If lock is non-nil, the delete is executed under the table lock.
// Returns the number of rows affected and any error.
DeleteKeys(ctx context.Context, sourceTable, targetTable *table.TableInfo, keys []string, lock *dbconn.TableLock) (int64, error)
// UpsertRows performs an upsert (INSERT ... ON DUPLICATE KEY UPDATE) synchronously.
// The rows are LogicalRow structs containing the row images.
// If lock is non-nil, the upsert is executed under the table lock.
// Returns the number of rows affected and any error.
UpsertRows(ctx context.Context, sourceTable, targetTable *table.TableInfo, rows []LogicalRow, lock *dbconn.TableLock) (int64, error)
// Wait blocks until all pending work is complete and all callbacks have been invoked
Wait(ctx context.Context) error
// Stops the applier workers
Stop() error
// GetTargets returns target information for direct database access.
// This is used by operations like checksum that need to query targets directly.
// For SingleTargetApplier, this returns a single target.
// For ShardedApplier, this returns all shards.
GetTargets() []Target
}
Applier is an interface for applying rows to one or more target databases. Implementations can apply to a single target (SingleTargetApplier) or fan out to multiple targets based on a hash function (ShardedApplier).
The Applier is responsible for: - Batching/splitting rows into optimal write sizes - Tracking pending writes - Invoking callbacks when writes are complete
type ApplierConfig ¶
type ApplierConfig struct {
Threads int // number of write threads
ChunkletMaxRows int
ChunkletMaxSize int
Logger *slog.Logger
DBConfig *dbconn.DBConfig
}
func NewApplierDefaultConfig ¶
func NewApplierDefaultConfig() *ApplierConfig
NewApplierDefaultConfig returns a default config for the applier.
func (*ApplierConfig) Validate ¶
func (cfg *ApplierConfig) Validate() error
Validate checks the ApplierConfig for required fields.
type ApplyCallback ¶
ApplyCallback is invoked when rows have been safely flushed to the target(s). affectedRows is the total number of rows affected across all targets. err is non-nil if there was an error applying the rows.
type LogicalRow ¶
LogicalRow represents the current state of a row in the subscription buffer. This could be that it is deleted, or that it has RowImage that describes it. If there is a RowImage, then it needs to be converted into the RowImage of the newTable.
type ShardedApplier ¶
ShardedApplier applies rows to multiple target databases based on a Vitess-style vindex. It extracts a specific column value from each row, applies a hash function to it, and routes the row to the appropriate shard based on the hash value and key ranges.
The sharding column and hash function are configured per-table in the TableInfo.ShardingColumn and TableInfo.HashFunc fields. This allows different tables to use different sharding keys in multi-table migrations.
func NewShardedApplier ¶
func NewShardedApplier(targets []Target, cfg *ApplierConfig) (*ShardedApplier, error)
NewShardedApplier creates a new ShardedApplier with multiple target databases.
The sharding column and hash function are configured per-table in the TableInfo.ShardingColumn and TableInfo.HashFunc fields. This allows different tables to use different sharding keys in multi-table migrations.
func (*ShardedApplier) Apply ¶
func (a *ShardedApplier) Apply(ctx context.Context, chunk *table.Chunk, rows [][]any, callback ApplyCallback) error
Apply sends rows to be written to the appropriate target shards. Rows are distributed across shards based on the sharding column and hash function configured in the chunk's Table.ShardingColumn and Table.HashFunc.
func (*ShardedApplier) DeleteKeys ¶
func (a *ShardedApplier) DeleteKeys(ctx context.Context, sourceTable, _ *table.TableInfo, keys []string, lock *dbconn.TableLock) (int64, error)
DeleteKeys deletes rows by their key values synchronously, broadcasting to all shards. The keys are hashed key strings (from utils.HashKey). If lock is non-nil, operations are executed under table locks (one per shard).
Note: we only track modifications by PRIMARY KEY, not by shard key (aka primary vindex). For this reason we can't extract the vindex value, and must instead broadcast the deletes to all shards. The vindex value is considered immutable, and we will error if it changes on an update.
Note: the sharded applier does not allow any transformations! The targetTable argument is intentionally ignored. This also means that table names between source and target must be the same.
func (*ShardedApplier) GetTargets ¶
func (a *ShardedApplier) GetTargets() []Target
GetTargets returns the target database configurations for direct access. This is used by operations like checksum that need to query targets directly.
func (*ShardedApplier) Start ¶
func (a *ShardedApplier) Start(ctx context.Context) error
Start initializes all shard workers and begins processing This method is idempotent and can restart the applier after Stop() is called.
func (*ShardedApplier) Stop ¶
func (a *ShardedApplier) Stop() error
Stop signals the applier to shut down gracefully
func (*ShardedApplier) UpsertRows ¶
func (a *ShardedApplier) UpsertRows(ctx context.Context, sourceTable, _ *table.TableInfo, rows []LogicalRow, lock *dbconn.TableLock) (int64, error)
UpsertRows performs upserts synchronously, distributing across shards. The rows are LogicalRow structs containing the row images. If lock is non-nil, operations are executed under table locks (one per shard).
Note: we only track modifications by PRIMARY KEY, not be shard key (aka primary vindex). For this reason we could get in trouble if there was a PK update that mutated the vindex column. This is because we would only see the last operation (modification) and not know to DELETE from one of the shards.
The way we address this, is we consider the vindex column immutable. The replication client is told that it should error if there are any updates to it, and the entire operation is canceled.
This is likely not too big of a limitation, as Vitess itself recommends that vindex columns be immutable. If it turns out to be a problem, we can revisit tracking by other columns later.
Note: the sharded applier does not allow any transformations! The targetTable argument is intentionally ignored. This also means that table names between source and target must be the same.
type SingleTargetApplier ¶
SingleTargetApplier applies rows to a single target database. It internally splits rows into chunklets for optimal batching and tracks completion to invoke callbacks when all chunklets for a set of rows are done.
func NewSingleTargetApplier ¶
func NewSingleTargetApplier(target Target, cfg *ApplierConfig) (*SingleTargetApplier, error)
NewSingleTargetApplier creates a new SingleTargetApplier
func (*SingleTargetApplier) Apply ¶
func (a *SingleTargetApplier) Apply(ctx context.Context, chunk *table.Chunk, rows [][]any, callback ApplyCallback) error
Apply sends rows to be written to the target database
func (*SingleTargetApplier) DeleteKeys ¶
func (a *SingleTargetApplier) DeleteKeys(ctx context.Context, sourceTable, targetTable *table.TableInfo, keys []string, lock *dbconn.TableLock) (int64, error)
DeleteKeys deletes rows by their key values synchronously. The keys are hashed key strings (from utils.HashKey). If lock is non-nil, the delete is executed under the table lock.
func (*SingleTargetApplier) GetTargets ¶
func (a *SingleTargetApplier) GetTargets() []Target
GetTargets returns the target database configuration for direct access. This is used by operations like checksum that need to query targets directly.
func (*SingleTargetApplier) Start ¶
func (a *SingleTargetApplier) Start(ctx context.Context) error
Start initializes the applier's async write workers and begins processing This does not control the synchronous methods like UpsertRows/DeleteKeys This method is idempotent - calling it multiple times is safe.
func (*SingleTargetApplier) Stop ¶
func (a *SingleTargetApplier) Stop() error
Stop signals the applier to shut down gracefully This does not control the synchronous methods like UpsertRows/DeleteKeys, which can continue after Stop() is called. This method is idempotent - calling it multiple times is safe.
func (*SingleTargetApplier) UpsertRows ¶
func (a *SingleTargetApplier) UpsertRows(ctx context.Context, sourceTable, targetTable *table.TableInfo, rows []LogicalRow, lock *dbconn.TableLock) (int64, error)
UpsertRows performs an upsert (INSERT ... ON DUPLICATE KEY UPDATE) synchronously. The rows are LogicalRow structs containing the row images. If lock is non-nil, the upsert is executed under the table lock.
type Target ¶
type Target struct {
DB *sql.DB
Config *mysql.Config
KeyRange string // Vitess-style key range: "-80", "80-", "80-c0", or "0" for unsharded
}
Target represents a shard target with its database connection, configuration, and key range. Key ranges are expressed as Vitess-style strings (e.g., "-80", "80-", "80-c0"). An empty string or "0" means all key space (unsharded).