repl

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2025 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Package repl contains binary log subscription functionality.

Index

Constants

View Source
const (

	// DefaultBatchSize is the number of rows in each batched REPLACE/DELETE statement.
	// Larger is better, but we need to keep the run-time of the statement well below
	// dbconn.maximumLockTime so that it doesn't prevent copy-row tasks from failing.
	// Since on some of our Aurora tables with out-of-cache workloads only copy ~300 rows per second,
	// we probably shouldn't set this any larger than about 1K. It will also use
	// multiple-flush-threads, which should help it group commit and still be fast.
	// This is only used as an initial starting value. It will auto-scale based on the DefaultTargetBatchTime.
	DefaultBatchSize = 1000

	// DefaultTargetBatchTime is the target time for flushing REPLACE/DELETE statements.
	DefaultTargetBatchTime = time.Millisecond * 500

	// DefaultFlushInterval is the time that the client will flush all binlog changes to disk.
	// Longer values require more memory, but permit more merging.
	// I expect we will change this to 1hr-24hr in the future.
	DefaultFlushInterval = 30 * time.Second
	// DefaultTimeout is how long BlockWait is supposed to wait before returning errors.
	DefaultTimeout = 30 * time.Second
)

Variables

This section is empty.

Functions

func EncodeSchemaTable

func EncodeSchemaTable(schema, table string) string

func NewServerID

func NewServerID() uint32

NewServerID randomizes the server ID to avoid conflicts with other binlog readers. This uses the same logic as canal:

Types

type Client

type Client struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewClient

func NewClient(db *sql.DB, host string, username, password string, config *ClientConfig) *Client

NewClient creates a new Client instance.

func (*Client) AddSubscription

func (c *Client) AddSubscription(currentTable, newTable *table.TableInfo, chunker table.Chunker) error

AddSubscription adds a new subscription. Returns an error if a subscription already exists for the given table.

func (*Client) AllChangesFlushed

func (c *Client) AllChangesFlushed() bool

func (*Client) BlockWait

func (c *Client) BlockWait(ctx context.Context) error

BlockWait blocks until all changes are *buffered*. i.e. the server's current position is 1234, but our buffered position is only 100. We need to read all the events until we reach >= 1234. We do not need to guarantee that they are flushed though, so you need to call Flush() to do that. This call times out! The default timeout is 10 seconds, after which an error will be returned.

func (*Client) Close

func (c *Client) Close()

func (*Client) Flush

func (c *Client) Flush(ctx context.Context) error

Flush empties the changeset in a loop until the amount of changes is considered "trivial". The loop is required, because changes continue to be added while the flush is occurring.

func (*Client) FlushUnderTableLock

func (c *Client) FlushUnderTableLock(ctx context.Context, lock *dbconn.TableLock) error

FlushUnderTableLock is a final flush under an exclusive table lock using the connection that holds a write lock. Because flushing generates binary log events, we actually want to call flush *twice*:

  • The first time flushes the pending changes to the new table.
  • We then ensure that we have all the binary log changes read from the server.
  • The second time reads through the changes generated by the first flush and updates the in memory applied position to match the server's position. This is required to satisfy the binlog position is updated for the c.AllChangesFlushed() check.

func (*Client) GetBinlogApplyPosition

func (c *Client) GetBinlogApplyPosition() mysql.Position

func (*Client) GetDeltaLen

func (c *Client) GetDeltaLen() int

GetDeltaLen returns the total number of changes that are pending across all subscriptions. Acquires the client lock for thread safety.

func (*Client) Run

func (c *Client) Run(ctx context.Context) (err error)

Run initializes the binlog syncer and starts the binlog reader. It returns an error if the initialization fails.

func (*Client) SetDDLNotificationChannel

func (c *Client) SetDDLNotificationChannel(ch chan string)

func (*Client) SetFlushedPos

func (c *Client) SetFlushedPos(pos mysql.Position)

SetFlushedPos updates the known safe position that all changes have been flushed. It is used for resuming from a checkpoint.

func (*Client) SetKeyAboveWatermarkOptimization

func (c *Client) SetKeyAboveWatermarkOptimization(newVal bool)

SetKeyAboveWatermarkOptimization sets the key above watermark optimization for all subscriptions. In future this should become obsolete!

func (*Client) StartPeriodicFlush

func (c *Client) StartPeriodicFlush(ctx context.Context, interval time.Duration)

StartPeriodicFlush starts a loop that periodically flushes the binlog changeset. This is used by the migrator to ensure the binlog position is advanced.

func (*Client) StopPeriodicFlush

func (c *Client) StopPeriodicFlush()

StopPeriodicFlush disables the periodic flush, also guaranteeing when it returns there is no current flush running

type ClientConfig

type ClientConfig struct {
	TargetBatchTime            time.Duration
	Concurrency                int
	Logger                     loggers.Advanced
	OnDDL                      chan string
	ServerID                   uint32
	UseExperimentalBufferedMap bool
	WriteDB                    *sql.DB          // if not nil, use this DB for writes.
	DBConfig                   *dbconn.DBConfig // Database configuration including TLS settings
}

func NewClientDefaultConfig

func NewClientDefaultConfig() *ClientConfig

NewClientDefaultConfig returns a default config for the copier.

type LogWrapper

type LogWrapper struct {
	// contains filtered or unexported fields
}

func NewLogWrapper

func NewLogWrapper(logger loggers.Advanced) *LogWrapper

If we pass our loggers.Advanced directly, it will write a bunch of spam to the log. So we use this hack to filter out the noisiest messages.

func (*LogWrapper) Debug

func (c *LogWrapper) Debug(args ...any)

func (*LogWrapper) Debugf

func (c *LogWrapper) Debugf(format string, args ...any)

func (*LogWrapper) Debugln

func (c *LogWrapper) Debugln(args ...any)

func (*LogWrapper) Error

func (c *LogWrapper) Error(args ...any)

func (*LogWrapper) Errorf

func (c *LogWrapper) Errorf(format string, args ...any)

func (*LogWrapper) Errorln

func (c *LogWrapper) Errorln(args ...any)

func (*LogWrapper) Fatal

func (c *LogWrapper) Fatal(args ...any)

func (*LogWrapper) Fatalf

func (c *LogWrapper) Fatalf(format string, args ...any)

func (*LogWrapper) Fatalln

func (c *LogWrapper) Fatalln(args ...any)

func (*LogWrapper) Info

func (c *LogWrapper) Info(args ...any)

func (*LogWrapper) Infof

func (c *LogWrapper) Infof(format string, args ...any)

func (*LogWrapper) Infoln

func (c *LogWrapper) Infoln(args ...any)

func (*LogWrapper) Panic

func (c *LogWrapper) Panic(args ...any)

func (*LogWrapper) Panicf

func (c *LogWrapper) Panicf(format string, args ...any)

func (*LogWrapper) Panicln

func (c *LogWrapper) Panicln(args ...any)

func (*LogWrapper) Print

func (c *LogWrapper) Print(args ...any)

func (*LogWrapper) Printf

func (c *LogWrapper) Printf(format string, args ...any)

func (*LogWrapper) Println

func (c *LogWrapper) Println(args ...any)

func (*LogWrapper) Warn

func (c *LogWrapper) Warn(args ...any)

func (*LogWrapper) Warnf

func (c *LogWrapper) Warnf(format string, args ...any)

func (*LogWrapper) Warnln

func (c *LogWrapper) Warnln(args ...any)

type Subscription

type Subscription interface {
	HasChanged(key, row []any, deleted bool)
	Length() int
	Flush(ctx context.Context, underLock bool, lock *dbconn.TableLock) (allChangesFlushed bool, err error)
	Tables() []*table.TableInfo // returns the tables related to the subscription in currentTable, newTable order
	SetKeyAboveWatermarkOptimization(enabled bool)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL