Documentation
¶
Overview ¶
Package repl contains binary log subscription functionality.
Index ¶
- Constants
- Variables
- func NewServerID() uint32
- type Client
- func (c *Client) AddSubscription(currentTable, newTable *table.TableInfo, chunker table.MappedChunker) error
- func (c *Client) AllChangesFlushed() bool
- func (c *Client) BlockWait(ctx context.Context) error
- func (c *Client) Close()
- func (c *Client) Flush(ctx context.Context) error
- func (c *Client) FlushUnderTableLock(ctx context.Context, lock *dbconn.TableLock) error
- func (c *Client) GetBinlogApplyPosition() mysql.Position
- func (c *Client) GetDeltaLen() int
- func (c *Client) Run(ctx context.Context) error
- func (c *Client) SetFlushedPos(pos mysql.Position)
- func (c *Client) SetWatermarkOptimization(ctx context.Context, newVal bool) error
- func (c *Client) StartPeriodicFlush(ctx context.Context, interval time.Duration)
- func (c *Client) StopPeriodicFlush()
- type ClientConfig
- type Subscription
Constants ¶
const ( // DefaultBatchSize is the fixed number of rows in each batched REPLACE/ // DELETE statement that the binlog applier emits against the _new // table. Larger is better, but we need to keep the run-time of the // statement well below dbconn.maximumLockTime so that it doesn't // prevent copy-row tasks from failing. On Aurora tables with // out-of-cache workloads that copy ~300 rows per second this is close // to the safe ceiling. // // Was previously an initial value for an adaptive sizer (feedback() // driven by p90 apply time). That mechanism was meaningful when the // applier issued `REPLACE INTO _new ... SELECT FROM source` and S-locked // rows on the live table, but after #853 the applier emits inline // VALUES against _new only — no source-side locks — and the batches // are strictly serial inside flushBatch. There's nothing left to // throttle, so the batch size is just a constant. See issue #869. DefaultBatchSize = 1000 // DefaultFlushInterval is the time that the client will flush all binlog changes to disk. // Longer values require more memory, but permit more merging. // I expect we will change this to 1hr-24hr in the future. DefaultFlushInterval = 30 * time.Second // DefaultSubscriptionSoftLimitBytes caps the approximate memory held // per subscription before HasChanged starts blocking on the buffered // map's condition variable. The cap is "soft": a single oversized // row admitted when the buffer is empty will exceed the limit, and // the next caller will park until that row drains. This keeps wide // rows (LONGTEXT / BLOB / large JSON) from OOMing the migrator // while still guaranteeing forward progress regardless of row width. // See pkg/repl/subscription_buffered.go for the accounting model. // // Operators should be aware that pausing the binlog reader for an // extended period risks falling past the source's binlog retention // (binlog_expire_logs_seconds). Tune this value, or the source's // retention, accordingly. DefaultSubscriptionSoftLimitBytes = 256 << 20 // DefaultTimeout is how long BlockWait is supposed to wait before returning errors. DefaultTimeout = 30 * time.Second )
Variables ¶
var ( // ErrChangesNotFlushed indicates that not all changes have been flushed from the replication feed. ErrChangesNotFlushed = errors.New("not all changes flushed") )
Functions ¶
func NewServerID ¶
func NewServerID() uint32
NewServerID generates a unique server ID to avoid conflicts with other binlog readers. Uses crypto/rand combined with an atomic counter to ensure uniqueness even when called concurrently. Returns a value in the range 1001-4294967295 to avoid conflicts with typical MySQL server IDs (0-1000).
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func NewClient ¶
func NewClient(db *sql.DB, host string, username, password string, appl applier.Applier, config *ClientConfig) *Client
NewClient creates a new Client instance. config.Applier is required!
func (*Client) AddSubscription ¶
func (c *Client) AddSubscription(currentTable, newTable *table.TableInfo, chunker table.MappedChunker) error
AddSubscription adds a new subscription. Returns an error if a subscription already exists for the given table.
func (*Client) AllChangesFlushed ¶
func (*Client) BlockWait ¶
BlockWait blocks until all changes are *buffered*. i.e. the server's current position is 1234, but our buffered position is only 100. We need to read all the events until we reach >= 1234. We do not need to guarantee that they are flushed though, so you need to call Flush() to do that. This call times out! The default timeout is 10 seconds, after which an error will be returned.
func (*Client) Flush ¶
Flush empties the changeset in a loop until the amount of changes is considered "trivial". The loop is required, because changes continue to be added while the flush is occurring.
func (*Client) FlushUnderTableLock ¶
FlushUnderTableLock is a final flush under an exclusive table lock using the connection that holds a write lock. Because flushing generates binary log events, we actually want to call flush *twice*:
- The first time flushes the pending changes to the new table.
- We then ensure that we have all the binary log changes read from the server.
- The second time reads through the changes generated by the first flush and updates the in memory applied position to match the server's position. This is required to satisfy the binlog position is updated for the c.AllChangesFlushed() check.
func (*Client) GetBinlogApplyPosition ¶
func (*Client) GetDeltaLen ¶
GetDeltaLen returns the total number of changes that are pending across all subscriptions.
func (*Client) Run ¶
Run initializes the binlog syncer and starts the binlog reader. It returns an error if the initialization fails.
func (*Client) SetFlushedPos ¶
SetFlushedPos updates the known safe position that all changes have been flushed. It is used for resuming from a checkpoint.
func (*Client) SetWatermarkOptimization ¶ added in v0.10.1
SetWatermarkOptimization sets both high and low watermark optimizations for all subscriptions. This should be disabled before checksum/cutover to ensure all changes are flushed regardless of watermark position.
Each subscription may drain its outgoing store on the toggle (see bufferedMap.SetWatermarkOptimization), so this can fail with the drain error. If one subscription fails, subsequent subscriptions are not touched and the caller should treat the operation as not-yet-applied.
Subscriptions are toggled against a snapshot so a long-running drain on one subscription doesn't block processRowsEvent from finding subscriptions for unrelated tables.
func (*Client) StartPeriodicFlush ¶
StartPeriodicFlush starts a goroutine that periodically flushes the binlog changeset, used by the migrator to advance the binlog position. Registration of the cancel/done pair happens synchronously in the caller's goroutine before the loop is spawned, so a follow-up StopPeriodicFlush is guaranteed to observe the registration. Callers MUST NOT prefix with `go` — the loop is spawned internally.
Calling Start while a flush is already running is a no-op.
func (*Client) StopPeriodicFlush ¶
func (c *Client) StopPeriodicFlush()
StopPeriodicFlush stops the periodic flush goroutine started by StartPeriodicFlush and blocks until that goroutine has fully exited. Safe to call when no periodic flush is running (no-op).
type ClientConfig ¶
type ClientConfig struct {
Logger *slog.Logger
ServerID uint32
DBConfig *dbconn.DBConfig // Database configuration including TLS settings
// CancelFunc is an optional callback from the caller (e.g. migration or move runner).
// It is called when a DDL change is detected on a subscribed table, or when a fatal
// stream error occurs (such as minimal RBR detection or exhausted streamer recreation
// attempts). The caller is expected to handle cancellation and cleanup.
// It returns true if the error was acted upon (caller actually cancelled),
// or false if it was ignored (e.g. because the caller is already past cutover).
CancelFunc func() bool
// DDLFilterSchema, when set, broadens DDL detection to cancel on any DDL change
// in the specified schema, rather than only on exact table matches against subscriptions.
// This is used by the move runner to detect DDL on any table in the source database.
DDLFilterSchema string
// DDLFilterTables, when set alongside DDLFilterSchema, narrows the schema-level
// DDL detection to only the specified table names. This is used for partial moves
// where only specific tables from a schema are being moved — DDL on unrelated
// tables in the same schema should not trigger cancellation.
// If empty (and DDLFilterSchema is set), all tables in the schema trigger cancellation.
DDLFilterTables []string
// SubscriptionSoftLimitBytes overrides DefaultSubscriptionSoftLimitBytes
// for new subscriptions. Set to a negative value to disable the cap
// entirely (HasChanged will never block on memory). Zero (the
// zero-value default) means use DefaultSubscriptionSoftLimitBytes.
SubscriptionSoftLimitBytes int64
}
func NewClientDefaultConfig ¶
func NewClientDefaultConfig() *ClientConfig
NewClientDefaultConfig returns a default config for the copier.
type Subscription ¶
type Subscription interface {
HasChanged(key, row []any, deleted bool)
Length() int
Flush(ctx context.Context, underLock bool, lock *dbconn.TableLock) (allChangesFlushed bool, err error)
Tables() []*table.TableInfo // returns the tables related to the subscription in currentTable, newTable order
// SetWatermarkOptimization toggles both high and low watermark
// optimizations. For non-memory-comparable PKs toggling switches the
// subscription between map mode and queue mode; on such a transition
// it drains the outgoing store via the applier so only one store has
// pending entries at a time. Returns the drain error if any.
SetWatermarkOptimization(ctx context.Context, enabled bool) error
// Close signals that no further events will be delivered. Any HasChanged
// caller currently parked on backpressure (e.g. the bufferedMap soft
// memory limit) is unblocked so the binlog reader goroutine can exit.
// Close does NOT flush; pending changes are discarded along with the
// subscription. It is safe to call more than once.
Close()
}