throttler

package
v0.10.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 2, 2026 License: Apache-2.0 Imports: 6 Imported by: 0

README

Throttlers

Throttlers are designed to limit the rate of changes pushed through the copier, helping maintain the health of the system during migrations.

Design Philosophy

Throttlers were designed as an interface from the start, because it was always believed there would be different use cases for when a schema change should be throttled. The original vision included:

  • Replication lag throttler: Monitors replica lag and pauses when replicas fall behind
  • HTTP service integration (e.g., Freno or Doorman): Allows schema changes and other background tasks to coordinate through a centralized throttling service
  • Custom implementations: The interface allows users to implement their own throttling strategies, such as reducing migration activity during stock trading hours or at least market open

Current State

In practice, throttlers haven't been used as extensively as originally envisioned. Because Spirit is primarily used with Aurora at Block, the replica throttler sees limited internal use. We have also found that by using Dynamic Chunking in the copier, schema changes actually self-throttle pretty well without a throttler.

However, it remains available and maintained for community use, particularly for users running traditional MySQL replication topologies. We are open to contributions to throttler improvements, such as being able to throttle on multiple replicas at once (issue #220).

Interface

All throttlers implement the Throttler interface:

type Throttler interface {
    Open(ctx context.Context) error
    Close() error
    IsThrottled() bool
    BlockWait(ctx context.Context)
    UpdateLag(ctx context.Context) error
}

Implementations

Noop Throttler

The default throttler that performs no throttling. Used when throttling is not required.

throttler := &throttler.Noop{}
Mock Throttler

A throttler used internally by the test suite to help reduce race conditions when running migration tests across different types of hardware. It injects 1 second of sleep every time BlockWait() is called.

Replication Throttler

Monitors replication lag on MySQL 8.0+ replicas using performance_schema metrics. This provides more accurate lag measurements than the traditional SHOW SLAVE STATUS approach.

throttler, err := throttler.NewReplicationThrottler(
    replicaDB,
    120*time.Second,  // lag tolerance
    logger,
)

Features:

  • Uses performance_schema for accurate lag calculation
  • Monitors both applier latency and queue latency
  • Automatically detects idle replicas to avoid false positives
  • Checks lag every 5 seconds by default
  • Blocks copy operations when lag exceeds tolerance (default: up to 60 seconds per check)

Usage

Throttlers are integrated into the copier and automatically pause chunk copying when the system is under stress:

copier := &copier.Unbuffered{
    Throttler: throttler,
    // ... other config
}

During migration, the copier calls throttler.BlockWait(ctx) before each chunk, pausing operations if IsThrottled() returns true.

Extending

To implement a custom throttler (e.g., for Freno integration):

  1. Implement the Throttler interface
  2. Start any background monitoring in Open()
  3. Update throttling state based on your metrics
  4. Return the current state in IsThrottled()
  5. Block appropriately in BlockWait() with context support

Example structure:

type CustomThrottler struct {
    isThrottled atomic.Bool
    // ... your fields
}

func (t *CustomThrottler) Open(ctx context.Context) error {
    // Start monitoring
    go t.monitor(ctx)
    return nil
}

func (t *CustomThrottler) IsThrottled() bool {
    return t.isThrottled.Load()
}

func (t *CustomThrottler) BlockWait(ctx context.Context) {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    
    for t.IsThrottled() {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            // Continue checking
        }
    }
}

See Also

Documentation

Overview

Package throttler contains code to throttle the rate of writes to a table.

Index

Constants

View Source
const MySQL8LagQuery = `` /* 1004-byte string literal not displayed */

MySQL8LagQuery is a query that is used to get the lag between the source and the replica. The implementation is described in https://github.com/block/spirit/issues/286 It uses performance_schema instead of a heartbeat injection or seconds_behind_source.

Variables

This section is empty.

Functions

This section is empty.

Types

type Mock added in v0.10.2

type Mock struct {
}

func (*Mock) BlockWait added in v0.10.2

func (t *Mock) BlockWait(ctx context.Context)

func (*Mock) Close added in v0.10.2

func (t *Mock) Close() error

func (*Mock) IsThrottled added in v0.10.2

func (t *Mock) IsThrottled() bool

func (*Mock) Open added in v0.10.2

func (t *Mock) Open(_ context.Context) error

func (*Mock) UpdateLag added in v0.10.2

func (t *Mock) UpdateLag(ctx context.Context) error

type Noop

type Noop struct {
	// contains filtered or unexported fields
}

func (*Noop) BlockWait

func (t *Noop) BlockWait(ctx context.Context)

func (*Noop) Close

func (t *Noop) Close() error

func (*Noop) IsThrottled

func (t *Noop) IsThrottled() bool

func (*Noop) Open

func (t *Noop) Open(_ context.Context) error

func (*Noop) UpdateLag

func (t *Noop) UpdateLag(ctx context.Context) error

type Replica added in v0.10.2

type Replica struct {
	// contains filtered or unexported fields
}

func (*Replica) BlockWait added in v0.10.2

func (l *Replica) BlockWait(ctx context.Context)

BlockWait blocks until the lag is within the tolerance, or up to 60s to allow some progress to be made. It respects context cancellation.

func (*Replica) Close added in v0.10.2

func (l *Replica) Close() error

func (*Replica) IsThrottled added in v0.10.2

func (l *Replica) IsThrottled() bool

func (*Replica) Open added in v0.10.2

func (l *Replica) Open(ctx context.Context) error

Open starts the lag monitor. This is not gh-ost. The lag monitor is primitive because the requirement is only for DR, and not for up-to-date read-replicas. Because chunk-sizes are typically 500ms, getting fine-grained metrics is not realistic. We only check the replica every 5 seconds, and typically allow up to 120s of replica lag, which is a lot.

func (*Replica) UpdateLag added in v0.10.2

func (l *Replica) UpdateLag(ctx context.Context) error

UpdateLag is a MySQL 8.0+ implementation of lag that is a better approximation than "seconds_behind_source". It requires performance_schema to be enabled.

type Throttler

type Throttler interface {
	Open(ctx context.Context) error
	Close() error
	IsThrottled() bool
	BlockWait(ctx context.Context)
	UpdateLag(ctx context.Context) error
}

func NewReplicationThrottler

func NewReplicationThrottler(replica *sql.DB, lagTolerance time.Duration, logger *slog.Logger) (Throttler, error)

NewReplicationThrottler returns a Throttler for MySQL 8.0+ replicas. It uses performance_schema to monitor replication lag.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL