copier

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

README

Copier

The copier package is responsible for copying rows from a source table to a target table during schema change and move operations. It orchestrates the parallel execution of chunks, integrates with throttlers to manage system load, and provides progress tracking with ETA estimation.

Design Philosophy

The copier was designed to be simple and reliable. It delegates the complexity of:

  • Chunking strategy to pkg/table (see table.Chunker and table.NewChunker)
  • Throttling decisions to pkg/throttler
  • Change application (for buffered mode) to pkg/applier

This separation of concerns makes the copier easier to test and maintain. The copier's job is to:

  1. Request chunks from the chunker
  2. Check with the throttler before processing
  3. Copy the chunk (either directly or via an applier)
  4. Provide feedback to the chunker for adaptive sizing
  5. Track progress and estimate completion time

Implementations

Spirit provides two copier implementations:

Unbuffered Copier (Default)

The unbuffered copier uses INSERT IGNORE INTO ... SELECT statements to copy data directly within MySQL. This is the default and recommended implementation for schema changes.

Advantages:

  • Minimal data transfer between Spirit and MySQL
  • Fewer edge cases for data corruption (charset/timezone conversions handled by MySQL)
  • Simpler code path with fewer moving parts

Disadvantages:

  • INSERT ... SELECT is locking and doesn't use MVCC on the SELECT side
  • Cannot be used for cross-server migrations (move/copy operations)

The locking issue is mitigated by using smaller chunks with dynamic chunk sizing, which yields locks frequently enough to avoid blocking other queries.

Buffered Copier

The buffered copier implements a producer/consumer pattern inspired by DBLog. Multiple reader goroutines extract rows from the source table and send them to an applier, which breaks them into chunklets and writes them to the target.

Advantages:

  • Can copy data between different MySQL servers
  • Uses MVCC-friendly SELECT statements
  • Required for move operations and sharded migrations

Disadvantages:

  • More complex code with additional failure modes
  • Higher network overhead between Spirit and MySQL
  • More CPU usage for serialization/deserialization

Status: The buffered copier is considered stable. It is primarily used for move operations where cross-server copying is required, but can also be used for schema changes by passing the --buffered option to spirit.

Interface

All copiers implement the Copier interface:

type Copier interface {
    Run(ctx context.Context) error
    GetETA() string
    GetChunker() table.Chunker
    SetThrottler(throttler throttler.Throttler)
    GetThrottler() throttler.Throttler
    StartTime() time.Time
    GetProgress() string
}
Methods
  • Run(ctx): Starts the copy process and blocks until completion or error. Spawns multiple worker goroutines based on the configured concurrency level.
  • GetETA(): Returns estimated time to completion as a human-readable string. Returns "TBD" during the initial warmup period (1 minute), "DUE" when >99.99% complete, or a duration like "2h30m15s".
  • GetProgress(): Returns progress as "copied/total percentage%" (e.g., "1000000/5000000 20.00%").
  • GetChunker(): Returns the underlying chunker for accessing detailed progress information.
  • SetThrottler(throttler): Updates the throttler used to control copy rate.
  • GetThrottler(): Returns the current throttler.
  • StartTime(): Returns when the copy operation started.

Configuration

Create a copier using NewCopier() with a CopierConfig:

type CopierConfig struct {
    Concurrency                   int
    TargetChunkTime               time.Duration
    Throttler                     throttler.Throttler
    Logger                        *slog.Logger
    MetricsSink                   metrics.Sink
    DBConfig                      *dbconn.DBConfig
    Applier                       applier.Applier
}
Configuration Options
  • Concurrency (default: 4): Number of parallel workers copying chunks. Higher values increase throughput but also increase load on MySQL.
  • TargetChunkTime (default: 1000ms): Recommended target time for processing each chunk. This field is not read by NewCopier directly; instead, pass it to table.NewChunker(...) (or your chunker implementation) so the chunker can use feedback to dynamically adjust chunk sizes.
  • Throttler (default: Noop): Controls when copying should pause to protect system health. See pkg/throttler for implementations.
  • Logger (default: slog.Default()): Structured logger for debugging and monitoring.
  • MetricsSink (default: NoopSink): Destination for metrics like chunk processing time and row counts.
  • DBConfig: Database connection configuration including retry settings.
  • Applier: When non-nil, the buffered copier is used instead of the unbuffered copier. The applier handles writing rows to the target.

Usage

Basic Example (Unbuffered)
// Create TableInfo for source and target tables
sourceTable := table.NewTableInfo(db, "mydb", "mytable")
if err := sourceTable.SetInfo(ctx); err != nil {
    return err
}
targetTable := table.NewTableInfo(db, "mydb", "_mytable_new")
if err := targetTable.SetInfo(ctx); err != nil {
    return err
}

// Create a chunker for the table
targetChunkTime := 30 * time.Second
chunker, err := table.NewChunker(sourceTable, targetTable, targetChunkTime, slog.Default())
if err != nil {
    return err
}

// Open the chunker before use
if err := chunker.Open(); err != nil {
    return err
}

// Create copier with default config
config := copier.NewCopierDefaultConfig()
config.Concurrency = 8
config.Throttler = myThrottler

copier, err := copier.NewCopier(db, chunker, config)
if err != nil {
    return err
}

// Start copying
if err := copier.Run(ctx); err != nil {
    return err
}

fmt.Printf("Copy completed in %s\n", time.Since(copier.StartTime()))
Progress Monitoring
// Start copier in background
go func() {
    if err := copier.Run(ctx); err != nil {
        log.Error("copy failed", "error", err)
    }
}()

// Monitor progress
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
    select {
    case <-ctx.Done():
        return
    case <-ticker.C:
        progress := copier.GetProgress()
        eta := copier.GetETA()
        fmt.Printf("Progress: %s, ETA: %s\n", progress, eta)
    }
}
Buffered Copier Example
// Create applier for buffered mode
applierConfig := applier.NewApplierDefaultConfig()
// customize applierConfig.Logger, applierConfig.DBConfig, and other fields as needed

target := applier.Target{
    DB: targetDB,
    // KeyRange: keyRange, // populate as appropriate for your use case
}

rowApplier, err := applier.NewSingleTargetApplier(target, applierConfig)
if err != nil {
    return err
}

// Create copier with buffered mode by specifying an applier
config := copier.NewCopierDefaultConfig()
config.Applier = rowApplier

copier, err := copier.NewCopier(sourceDB, chunker, config)
if err != nil {
    return err
}

if err := copier.Run(ctx); err != nil {
    return err
}

Core Concepts

Chunker Integration

The copier is tightly integrated with the chunker in pkg/table (see pkg/table/chunker.go and related files):

  1. Chunk Requests: The copier calls chunker.Next() to get the next chunk to process.
  2. Feedback Loop: After processing each chunk, the copier calls chunker.Feedback(chunk, processingTime, affectedRows).
  3. Dynamic Sizing: The chunker uses feedback to adjust chunk sizes, aiming for the target chunk time.
  4. Progress Tracking: The copier delegates progress calculation to the chunker via chunker.Progress().

This design allows the chunker to optimize chunk sizes based on actual performance, adapting to table characteristics and system load.

Parallelism

Both copier implementations use goroutines for parallel chunk processing:

Unbuffered:

  • Uses errgroup.WithContext() with a concurrency limit
  • Schedules one goroutine per chunk: each goroutine copies a single chunk and returns
  • Stops on first error

Buffered:

  • Fixed number of reader goroutines (equal to concurrency)
  • Each reader goroutine reads chunks and sends rows to the applier
  • The applier has its own internal parallelism for writing
  • Callbacks notify readers when writes complete
Error Handling

Both implementations fail fast on errors:

  • Any error during chunk processing sets an isInvalid flag
  • The flag causes all workers to stop requesting new chunks
  • The error is returned from Run()
  • No automatic retries at the copier level (writes use dbconn.RetryableTransaction for retries)
ETA Estimation

The copier provides sophisticated ETA estimation:

  1. Warmup Period: Returns "TBD" for the first minute to allow for stabilization
  2. Rate Calculation: Every 10 seconds, calculates rows/second based on progress
  3. Remaining Time: Divides remaining rows by current rate
  4. Historical Comparison: Tracks ETA history at 1-hour increments and shows whether it is improving or worsening (e.g., "2h30m (15m from 1h ago)" means the ETA improved by 15 minutes compared to an hour ago, while "2h30m (-15m from 1h ago)" would mean it got 15 minutes worse)
  5. Nearly Complete: Returns "DUE" when >99.99% complete

The ETA adapts to changing conditions like throttling, system load, or chunk size adjustments.

Metrics

The copier emits metrics for each chunk:

  • chunk_processing_time (gauge): Time in milliseconds to process the chunk
  • chunk_num_logical_rows (counter): Number of rows in the chunk range (may include gaps)
  • chunk_num_affected_rows (counter): Actual number of rows copied

These metrics help monitor copy performance and identify bottlenecks.

Implementation Details

Unbuffered Implementation

The unbuffered copier (unbuffered.go) uses a simple worker pool pattern:

func (c *Unbuffered) Run(ctx context.Context) error {
    g, errGrpCtx := errgroup.WithContext(ctx)
    g.SetLimit(c.concurrency)
    
    for !c.chunker.IsRead() && c.isHealthy(errGrpCtx) {
        g.Go(func() error {
            chunk, err := c.chunker.Next()
            if err != nil {
                if err == table.ErrTableIsRead {
                    return nil
                }
                c.setInvalid(true)
                return err
            }
            if err := c.CopyChunk(errGrpCtx, chunk); err != nil {
                c.setInvalid(true)
                return err
            }
            return nil
        })
    }
    
    return g.Wait()
}

Each chunk is copied with:

INSERT IGNORE INTO new_table (cols)
SELECT cols FROM old_table FORCE INDEX (PRIMARY)
WHERE <chunk_range>

The INSERT IGNORE is used because resuming from a checkpoint may re-apply some previously executed work.

Buffered Implementation

The buffered copier (buffered.go) uses a producer/consumer pattern:

  1. Reader Workers: Multiple goroutines read chunks from the source table into memory
  2. Applier Queue: Rows are sent to the applier with a callback
  3. Write Workers: The applier's internal workers write chunklets in parallel
  4. Callback Invocation: When all chunklets for a batch complete, the callback is invoked
  5. Feedback: The callback sends feedback to the chunker and emits metrics

This architecture allows for:

  • Overlapping read and write operations
  • Cross-server copying (source and target can be different databases)
  • Fine-grained control over write batch sizes via the applier

The buffered copier must coordinate shutdown carefully:

  1. Wait for all readers to finish
  2. Wait for the applier to process all pending work
  3. Stop the applier (but don't close DB connections)
Throttler Integration

Both implementations check the throttler before processing each chunk:

c.throttler.BlockWait(ctx)

This call blocks if throttler.IsThrottled() returns true, pausing the copy operation until conditions improve. The throttler is pluggable, with a built-in implementation for high replication lag, but in future other implementations may also be used, such as an external throttling service (Freno, Doorman).

See pkg/throttler for details on throttler implementations.

See Also

Documentation

Overview

Package copier copies rows from one table to another. it makes use of tableinfo.Chunker, and does the parallelism and retries here. It fails on the first error.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Copier

type Copier interface {
	Run(ctx context.Context) error
	GetETA() string
	GetChunker() table.Chunker
	SetThrottler(throttler throttler.Throttler)
	GetThrottler() throttler.Throttler
	StartTime() time.Time
	GetProgress() string
}

Copier is the interface which copiers use. Currently we only have one implementation, which we call unbuffered because it uses INSERT .. SELECT without any intermediate buffering in spirit. In future we may have another implementation, see: https://github.com/block/spirit/issues/451

func NewCopier

func NewCopier(db *sql.DB, chunker table.Chunker, config *CopierConfig) (Copier, error)

NewCopier creates a new copier object with the provided chunker. The chunker could have been opened at a watermark, we are agnostic to that. It could also return different tables on each Next() call in future, so we don't save any fields related to the table.

type CopierConfig

type CopierConfig struct {
	Concurrency     int
	TargetChunkTime time.Duration
	Throttler       throttler.Throttler
	Logger          *slog.Logger
	MetricsSink     metrics.Sink
	DBConfig        *dbconn.DBConfig
	Applier         applier.Applier
}

func NewCopierDefaultConfig

func NewCopierDefaultConfig() *CopierConfig

NewCopierDefaultConfig returns a default config for the copier.

type Unbuffered

type Unbuffered struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*Unbuffered) CopyChunk

func (c *Unbuffered) CopyChunk(ctx context.Context, chunk *table.Chunk) error

CopyChunk copies a chunk from the table to the newTable. it is public so it can be used in tests incrementally.

func (*Unbuffered) GetChunker

func (c *Unbuffered) GetChunker() table.Chunker

GetChunker returns the chunker for accessing progress information

func (*Unbuffered) GetETA

func (c *Unbuffered) GetETA() string

func (*Unbuffered) GetProgress

func (c *Unbuffered) GetProgress() string

GetProgress returns the progress of the copier

func (*Unbuffered) GetThrottler

func (c *Unbuffered) GetThrottler() throttler.Throttler

func (*Unbuffered) Run

func (c *Unbuffered) Run(ctx context.Context) error

func (*Unbuffered) SetThrottler

func (c *Unbuffered) SetThrottler(throttler throttler.Throttler)

func (*Unbuffered) StartTime

func (c *Unbuffered) StartTime() time.Time

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL