repl

package
v0.10.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 19, 2026 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Package repl contains binary log subscription functionality.

Index

Constants

View Source
const (

	// DefaultBatchSize is the number of rows in each batched REPLACE/DELETE statement.
	// Larger is better, but we need to keep the run-time of the statement well below
	// dbconn.maximumLockTime so that it doesn't prevent copy-row tasks from failing.
	// Since on some of our Aurora tables with out-of-cache workloads only copy ~300 rows per second,
	// we probably shouldn't set this any larger than about 1K. It will also use
	// multiple-flush-threads, which should help it group commit and still be fast.
	// This is only used as an initial starting value. It will auto-scale based on the DefaultTargetBatchTime.
	DefaultBatchSize = 1000

	// DefaultTargetBatchTime is the target time for flushing REPLACE/DELETE statements.
	DefaultTargetBatchTime = time.Millisecond * 500

	// DefaultFlushInterval is the time that the client will flush all binlog changes to disk.
	// Longer values require more memory, but permit more merging.
	// I expect we will change this to 1hr-24hr in the future.
	DefaultFlushInterval = 30 * time.Second
	// DefaultTimeout is how long BlockWait is supposed to wait before returning errors.
	DefaultTimeout = 30 * time.Second
)

Variables

This section is empty.

Functions

func EncodeSchemaTable

func EncodeSchemaTable(schema, table string) string

func NewServerID

func NewServerID() uint32

NewServerID randomizes the server ID to avoid conflicts with other binlog readers. This uses the same logic as canal:

Types

type Client

type Client struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewClient

func NewClient(db *sql.DB, host string, username, password string, config *ClientConfig) *Client

NewClient creates a new Client instance.

func (*Client) AddSubscription

func (c *Client) AddSubscription(currentTable, newTable *table.TableInfo, chunker table.Chunker) error

AddSubscription adds a new subscription. Returns an error if a subscription already exists for the given table.

func (*Client) AllChangesFlushed

func (c *Client) AllChangesFlushed() bool

func (*Client) BlockWait

func (c *Client) BlockWait(ctx context.Context) error

BlockWait blocks until all changes are *buffered*. i.e. the server's current position is 1234, but our buffered position is only 100. We need to read all the events until we reach >= 1234. We do not need to guarantee that they are flushed though, so you need to call Flush() to do that. This call times out! The default timeout is 10 seconds, after which an error will be returned.

func (*Client) Close

func (c *Client) Close()

func (*Client) Flush

func (c *Client) Flush(ctx context.Context) error

Flush empties the changeset in a loop until the amount of changes is considered "trivial". The loop is required, because changes continue to be added while the flush is occurring.

func (*Client) FlushUnderTableLock

func (c *Client) FlushUnderTableLock(ctx context.Context, lock *dbconn.TableLock) error

FlushUnderTableLock is a final flush under an exclusive table lock using the connection that holds a write lock. Because flushing generates binary log events, we actually want to call flush *twice*:

  • The first time flushes the pending changes to the new table.
  • We then ensure that we have all the binary log changes read from the server.
  • The second time reads through the changes generated by the first flush and updates the in memory applied position to match the server's position. This is required to satisfy the binlog position is updated for the c.AllChangesFlushed() check.

func (*Client) GetBinlogApplyPosition

func (c *Client) GetBinlogApplyPosition() mysql.Position

func (*Client) GetDeltaLen

func (c *Client) GetDeltaLen() int

GetDeltaLen returns the total number of changes that are pending across all subscriptions. Acquires the client lock for thread safety.

func (*Client) Run

func (c *Client) Run(ctx context.Context) (err error)

Run initializes the binlog syncer and starts the binlog reader. It returns an error if the initialization fails.

func (*Client) SetDDLNotificationChannel

func (c *Client) SetDDLNotificationChannel(ch chan string)

func (*Client) SetFlushedPos

func (c *Client) SetFlushedPos(pos mysql.Position)

SetFlushedPos updates the known safe position that all changes have been flushed. It is used for resuming from a checkpoint.

func (*Client) SetWatermarkOptimization added in v0.10.1

func (c *Client) SetWatermarkOptimization(newVal bool)

SetWatermarkOptimization sets both high and low watermark optimizations for all subscriptions. This should be disabled before checksum/cutover to ensure all changes are flushed regardless of watermark position.

func (*Client) StartPeriodicFlush

func (c *Client) StartPeriodicFlush(ctx context.Context, interval time.Duration)

StartPeriodicFlush starts a loop that periodically flushes the binlog changeset. This is used by the migrator to ensure the binlog position is advanced.

func (*Client) StopPeriodicFlush

func (c *Client) StopPeriodicFlush()

StopPeriodicFlush disables the periodic flush, also guaranteeing when it returns there is no current flush running

type ClientConfig

type ClientConfig struct {
	TargetBatchTime            time.Duration
	Concurrency                int
	Logger                     *slog.Logger
	OnDDL                      chan string
	ServerID                   uint32
	UseExperimentalBufferedMap bool
	Applier                    applier.Applier
	DBConfig                   *dbconn.DBConfig // Database configuration including TLS settings
}

func NewClientDefaultConfig

func NewClientDefaultConfig() *ClientConfig

NewClientDefaultConfig returns a default config for the copier.

type Subscription

type Subscription interface {
	HasChanged(key, row []any, deleted bool)
	Length() int
	Flush(ctx context.Context, underLock bool, lock *dbconn.TableLock) (allChangesFlushed bool, err error)
	Tables() []*table.TableInfo            // returns the tables related to the subscription in currentTable, newTable order
	SetWatermarkOptimization(enabled bool) // Controls both high and low watermark optimizations
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL