Documentation
¶
Overview ¶
Package repl contains binary log subscription functionality.
Index ¶
- Constants
- func EncodeSchemaTable(schema, table string) string
- func NewServerID() uint32
- type Client
- func (c *Client) AddSubscription(currentTable, newTable *table.TableInfo, chunker table.Chunker) error
- func (c *Client) AllChangesFlushed() bool
- func (c *Client) BlockWait(ctx context.Context) error
- func (c *Client) Close()
- func (c *Client) Flush(ctx context.Context) error
- func (c *Client) FlushUnderTableLock(ctx context.Context, lock *dbconn.TableLock) error
- func (c *Client) GetBinlogApplyPosition() mysql.Position
- func (c *Client) GetDeltaLen() int
- func (c *Client) Run(ctx context.Context) (err error)
- func (c *Client) SetDDLNotificationChannel(ch chan string)
- func (c *Client) SetFlushedPos(pos mysql.Position)
- func (c *Client) SetKeyAboveWatermarkOptimization(newVal bool)
- func (c *Client) StartPeriodicFlush(ctx context.Context, interval time.Duration)
- func (c *Client) StopPeriodicFlush()
- type ClientConfig
- type LogWrapper
- func (c *LogWrapper) Debug(args ...any)
- func (c *LogWrapper) Debugf(format string, args ...any)
- func (c *LogWrapper) Debugln(args ...any)
- func (c *LogWrapper) Error(args ...any)
- func (c *LogWrapper) Errorf(format string, args ...any)
- func (c *LogWrapper) Errorln(args ...any)
- func (c *LogWrapper) Fatal(args ...any)
- func (c *LogWrapper) Fatalf(format string, args ...any)
- func (c *LogWrapper) Fatalln(args ...any)
- func (c *LogWrapper) Info(args ...any)
- func (c *LogWrapper) Infof(format string, args ...any)
- func (c *LogWrapper) Infoln(args ...any)
- func (c *LogWrapper) Panic(args ...any)
- func (c *LogWrapper) Panicf(format string, args ...any)
- func (c *LogWrapper) Panicln(args ...any)
- func (c *LogWrapper) Print(args ...any)
- func (c *LogWrapper) Printf(format string, args ...any)
- func (c *LogWrapper) Println(args ...any)
- func (c *LogWrapper) Warn(args ...any)
- func (c *LogWrapper) Warnf(format string, args ...any)
- func (c *LogWrapper) Warnln(args ...any)
- type Subscription
Constants ¶
const ( // DefaultBatchSize is the number of rows in each batched REPLACE/DELETE statement. // Larger is better, but we need to keep the run-time of the statement well below // dbconn.maximumLockTime so that it doesn't prevent copy-row tasks from failing. // Since on some of our Aurora tables with out-of-cache workloads only copy ~300 rows per second, // we probably shouldn't set this any larger than about 1K. It will also use // multiple-flush-threads, which should help it group commit and still be fast. // This is only used as an initial starting value. It will auto-scale based on the DefaultTargetBatchTime. DefaultBatchSize = 1000 // DefaultTargetBatchTime is the target time for flushing REPLACE/DELETE statements. DefaultTargetBatchTime = time.Millisecond * 500 // DefaultFlushInterval is the time that the client will flush all binlog changes to disk. // Longer values require more memory, but permit more merging. // I expect we will change this to 1hr-24hr in the future. DefaultFlushInterval = 30 * time.Second // DefaultTimeout is how long BlockWait is supposed to wait before returning errors. DefaultTimeout = 30 * time.Second )
Variables ¶
This section is empty.
Functions ¶
func EncodeSchemaTable ¶
func NewServerID ¶
func NewServerID() uint32
NewServerID randomizes the server ID to avoid conflicts with other binlog readers. This uses the same logic as canal:
Types ¶
type Client ¶
func (*Client) AddSubscription ¶
func (c *Client) AddSubscription(currentTable, newTable *table.TableInfo, chunker table.Chunker) error
AddSubscription adds a new subscription. Returns an error if a subscription already exists for the given table.
func (*Client) AllChangesFlushed ¶
func (*Client) BlockWait ¶
BlockWait blocks until all changes are *buffered*. i.e. the server's current position is 1234, but our buffered position is only 100. We need to read all the events until we reach >= 1234. We do not need to guarantee that they are flushed though, so you need to call Flush() to do that. This call times out! The default timeout is 10 seconds, after which an error will be returned.
func (*Client) Flush ¶
Flush empties the changeset in a loop until the amount of changes is considered "trivial". The loop is required, because changes continue to be added while the flush is occurring.
func (*Client) FlushUnderTableLock ¶
FlushUnderTableLock is a final flush under an exclusive table lock using the connection that holds a write lock. Because flushing generates binary log events, we actually want to call flush *twice*:
- The first time flushes the pending changes to the new table.
- We then ensure that we have all the binary log changes read from the server.
- The second time reads through the changes generated by the first flush and updates the in memory applied position to match the server's position. This is required to satisfy the binlog position is updated for the c.AllChangesFlushed() check.
func (*Client) GetBinlogApplyPosition ¶
func (*Client) GetDeltaLen ¶
GetDeltaLen returns the total number of changes that are pending across all subscriptions. Acquires the client lock for thread safety.
func (*Client) Run ¶
Run initializes the binlog syncer and starts the binlog reader. It returns an error if the initialization fails.
func (*Client) SetDDLNotificationChannel ¶
func (*Client) SetFlushedPos ¶
SetFlushedPos updates the known safe position that all changes have been flushed. It is used for resuming from a checkpoint.
func (*Client) SetKeyAboveWatermarkOptimization ¶
SetKeyAboveWatermarkOptimization sets the key above watermark optimization for all subscriptions. In future this should become obsolete!
func (*Client) StartPeriodicFlush ¶
StartPeriodicFlush starts a loop that periodically flushes the binlog changeset. This is used by the migrator to ensure the binlog position is advanced.
func (*Client) StopPeriodicFlush ¶
func (c *Client) StopPeriodicFlush()
StopPeriodicFlush disables the periodic flush, also guaranteeing when it returns there is no current flush running
type ClientConfig ¶
type ClientConfig struct {
TargetBatchTime time.Duration
Concurrency int
Logger loggers.Advanced
OnDDL chan string
ServerID uint32
UseExperimentalBufferedMap bool
WriteDB *sql.DB // if not nil, use this DB for writes.
DBConfig *dbconn.DBConfig // Database configuration including TLS settings
}
func NewClientDefaultConfig ¶
func NewClientDefaultConfig() *ClientConfig
NewClientDefaultConfig returns a default config for the copier.
type LogWrapper ¶
type LogWrapper struct {
// contains filtered or unexported fields
}
func NewLogWrapper ¶
func NewLogWrapper(logger loggers.Advanced) *LogWrapper
If we pass our loggers.Advanced directly, it will write a bunch of spam to the log. So we use this hack to filter out the noisiest messages.
func (*LogWrapper) Debug ¶
func (c *LogWrapper) Debug(args ...any)
func (*LogWrapper) Debugf ¶
func (c *LogWrapper) Debugf(format string, args ...any)
func (*LogWrapper) Debugln ¶
func (c *LogWrapper) Debugln(args ...any)
func (*LogWrapper) Error ¶
func (c *LogWrapper) Error(args ...any)
func (*LogWrapper) Errorf ¶
func (c *LogWrapper) Errorf(format string, args ...any)
func (*LogWrapper) Errorln ¶
func (c *LogWrapper) Errorln(args ...any)
func (*LogWrapper) Fatal ¶
func (c *LogWrapper) Fatal(args ...any)
func (*LogWrapper) Fatalf ¶
func (c *LogWrapper) Fatalf(format string, args ...any)
func (*LogWrapper) Fatalln ¶
func (c *LogWrapper) Fatalln(args ...any)
func (*LogWrapper) Info ¶
func (c *LogWrapper) Info(args ...any)
func (*LogWrapper) Infof ¶
func (c *LogWrapper) Infof(format string, args ...any)
func (*LogWrapper) Infoln ¶
func (c *LogWrapper) Infoln(args ...any)
func (*LogWrapper) Panic ¶
func (c *LogWrapper) Panic(args ...any)
func (*LogWrapper) Panicf ¶
func (c *LogWrapper) Panicf(format string, args ...any)
func (*LogWrapper) Panicln ¶
func (c *LogWrapper) Panicln(args ...any)
func (*LogWrapper) Print ¶
func (c *LogWrapper) Print(args ...any)
func (*LogWrapper) Printf ¶
func (c *LogWrapper) Printf(format string, args ...any)
func (*LogWrapper) Println ¶
func (c *LogWrapper) Println(args ...any)
func (*LogWrapper) Warn ¶
func (c *LogWrapper) Warn(args ...any)
func (*LogWrapper) Warnf ¶
func (c *LogWrapper) Warnf(format string, args ...any)
func (*LogWrapper) Warnln ¶
func (c *LogWrapper) Warnln(args ...any)
type Subscription ¶
type Subscription interface {
HasChanged(key, row []any, deleted bool)
Length() int
Flush(ctx context.Context, underLock bool, lock *dbconn.TableLock) (allChangesFlushed bool, err error)
Tables() []*table.TableInfo // returns the tables related to the subscription in currentTable, newTable order
SetKeyAboveWatermarkOptimization(enabled bool)
}