Documentation
¶
Overview ¶
Package change contains binary log subscription functionality.
Index ¶
Constants ¶
const ( // DefaultBatchSize is the fixed number of rows in each batched REPLACE/ // DELETE statement that the binlog applier emits against the _new // table. Larger is better, but we need to keep the run-time of the // statement well below dbconn.maximumLockTime so that it doesn't // prevent copy-row tasks from failing. On Aurora tables with // out-of-cache workloads that copy ~300 rows per second this is close // to the safe ceiling. // // Was previously an initial value for an adaptive sizer (feedback() // driven by p90 apply time). That mechanism was meaningful when the // applier issued `REPLACE INTO _new ... SELECT FROM source` and S-locked // rows on the live table, but after #853 the applier emits inline // VALUES against _new only — no source-side locks — and the batches // are strictly serial inside flushBatch. There's nothing left to // throttle, so the batch size is just a constant. See issue #869. DefaultBatchSize = 1000 // DefaultFlushInterval is the time that the client will flush all binlog changes to disk. // Longer values require more memory, but permit more merging. // I expect we will change this to 1hr-24hr in the future. DefaultFlushInterval = 30 * time.Second // DefaultSubscriptionSoftLimitBytes caps the approximate memory held // per subscription before HasChanged starts blocking on the buffered // map's condition variable. The cap is "soft": a single oversized // row admitted when the buffer is empty will exceed the limit, and // the next caller will park until that row drains. This keeps wide // rows (LONGTEXT / BLOB / large JSON) from OOMing the migrator // while still guaranteeing forward progress regardless of row width. // See pkg/change/subscription_buffered.go for the accounting model. // // Operators should be aware that pausing the binlog reader for an // extended period risks falling past the source's binlog retention // (binlog_expire_logs_seconds). Tune this value, or the source's // retention, accordingly. DefaultSubscriptionSoftLimitBytes = 256 << 20 // DefaultTimeout is how long BlockWait is supposed to wait before returning errors. DefaultTimeout = 30 * time.Second )
Variables ¶
var ( // ErrChangesNotFlushed indicates that not all changes have been flushed from the replication feed. ErrChangesNotFlushed = errors.New("not all changes flushed") )
var ErrPositionNotFound = errors.New("change.Source: cannot resume from position; it is no longer available on the source")
ErrPositionNotFound is returned by StartFromPosition when the underlying source can no longer resume from the requested opaque position — most commonly because the binlog file has been purged on a MySQL source. Wrapped with %w so callers can errors.Is against it.
Functions ¶
func NewServerID ¶
func NewServerID() uint32
NewServerID generates a unique server ID to avoid conflicts with other binlog readers. Uses crypto/rand combined with an atomic counter to ensure uniqueness even when called concurrently. Returns a value in the range 1001-4294967295 to avoid conflicts with typical MySQL server IDs (0-1000).
Types ¶
type BufferedSubscriptionConfig ¶
type BufferedSubscriptionConfig struct {
// CurrentTable is the source-side TableInfo. Required.
CurrentTable *table.TableInfo
// NewTable is the destination-side TableInfo. May be nil for
// MoveTables/import flows where source and destination share the
// same schema; in that case Subscription.Tables() returns
// [CurrentTable, nil].
NewTable *table.TableInfo
// Applier writes batched changes to the target. Required.
Applier applier.Applier
// Chunker provides the watermark filter + column mapping. Required.
Chunker table.MappedChunker
// Logger receives diagnostic events. Defaults to slog.Default()
// when nil.
Logger *slog.Logger
// SoftLimitBytes is the per-subscription byte cap before
// HasChanged blocks waiting on the flush path. Zero disables the
// cap. See bufferedMap.softLimitBytes for the semantics.
SoftLimitBytes int64
}
BufferedSubscriptionConfig configures NewBufferedSubscription.
type ClientConfig ¶
type ClientConfig struct {
Logger *slog.Logger
ServerID uint32
DBConfig *dbconn.DBConfig // Database configuration including TLS settings
// CancelFunc is an optional callback from the caller (e.g. migration or move runner).
// It is called when a DDL change is detected on a subscribed table, or when a fatal
// stream error occurs (such as minimal RBR detection or exhausted streamer recreation
// attempts). The caller is expected to handle cancellation and cleanup.
// It returns true if the error was acted upon (caller actually cancelled),
// or false if it was ignored (e.g. because the caller is already past cutover).
CancelFunc func() bool
// DDLFilterSchema, when set, broadens DDL detection to cancel on any DDL change
// in the specified schema, rather than only on exact table matches against subscriptions.
// This is used by the move runner to detect DDL on any table in the source database.
DDLFilterSchema string
// DDLFilterTables, when set alongside DDLFilterSchema, narrows the schema-level
// DDL detection to only the specified table names. This is used for partial moves
// where only specific tables from a schema are being moved — DDL on unrelated
// tables in the same schema should not trigger cancellation.
// If empty (and DDLFilterSchema is set), all tables in the schema trigger cancellation.
DDLFilterTables []string
// SubscriptionSoftLimitBytes overrides DefaultSubscriptionSoftLimitBytes
// for new subscriptions. Set to a negative value to disable the cap
// entirely (HasChanged will never block on memory). Zero (the
// zero-value default) means use DefaultSubscriptionSoftLimitBytes.
SubscriptionSoftLimitBytes int64
}
func NewClientDefaultConfig ¶
func NewClientDefaultConfig() *ClientConfig
NewClientDefaultConfig returns a default config for the copier.
type Source ¶
type Source interface {
// AddSubscription constructs a bufferedMap from (currentTable,
// newTable, chunker) and registers it. ROW events matching the
// registered (schema, table) pair are pushed to the subscription's
// HasChanged. Must be called before Start / StartFromPosition.
AddSubscription(currentTable, newTable *table.TableInfo, chunker table.MappedChunker) error
// Start begins streaming from the current source head and spawns the
// reader goroutine. Returns once the reader is running; the stream
// itself continues until Close is called or ctx is cancelled.
// Implementations perform any required validation (privileges,
// connectivity, server settings) before returning.
Start(ctx context.Context) error
// StartFromPosition is the resume-time entry point. It primes the
// source's internal position to the opaque string previously
// returned by Position(), then begins streaming as if Start had
// been called. Implementations validate the position is still
// resumable (e.g. MySQL: the binlog file has not been purged); an
// unresumable position is returned wrapped with ErrPositionNotFound.
StartFromPosition(ctx context.Context, pos string) error
// Position returns the latest safe-to-resume position as an opaque
// string. The implementation owns the encoding; spirit never parses
// it. Advances only at transaction commit boundaries. Returns "" if
// no position has been observed yet, signaling that a fresh Start is
// required.
Position() string
// Flush requests that all registered subscriptions flush their
// buffered changes to their targets. Blocks until the flush
// completes or ctx cancels.
Flush(ctx context.Context) error
// FlushUnderTableLock is the cutover-time variant of Flush: the
// caller holds table locks and we drain the in-flight backlog
// against that quiescent state. locks carries one lock per target
// server being written to (a single lock for single-target
// migrations; one per shard for sharded moves) — the applier
// executes each target's statements under that target's own lock,
// since LOCK TABLES blocks writes from every other connection.
FlushUnderTableLock(ctx context.Context, locks []*dbconn.TableLock) error
// BlockWait blocks until all events received from the underlying
// stream up to call-time have been delivered to their subscriptions.
// Used by the runner around cutover to drain the in-flight backlog.
// Returns when drained or ctx cancels.
BlockWait(ctx context.Context) error
// GetDeltaLen returns the total number of pending changes across
// all registered subscriptions. Used by callers to decide whether
// the backlog is small enough to consider cutover.
GetDeltaLen() int
// SetWatermarkOptimization toggles the high/low watermark
// optimization across all subscriptions. Disabled before
// checksum/cutover to ensure all changes are flushed regardless of
// watermark position.
SetWatermarkOptimization(ctx context.Context, enabled bool) error
// StartPeriodicFlush spawns a background goroutine that flushes the
// changeset at the given interval. Used by the migrator to advance
// the safe-flushed position. Calling Start while a periodic flush
// is already running is a no-op.
StartPeriodicFlush(ctx context.Context, interval time.Duration)
// StopPeriodicFlush stops the goroutine started by
// StartPeriodicFlush. Safe to call when no periodic flush is
// running (no-op).
StopPeriodicFlush()
// AllChangesFlushed reports whether the buffered position has been
// caught up to the flushed position (i.e. no in-flight changes
// remain). For non-binlog implementations, this is equivalent to
// "have all received events been applied?".
AllChangesFlushed() bool
// Close releases all resources. Safe to call more than once.
Close()
}
Source is the abstraction spirit uses to consume a stream of row changes from a source database. It exists so spirit's replication pipeline is not pinned to the MySQL binlog protocol — alternative implementations (e.g. Vitess VStream) can plug in without touching the applier, the bufferedMap, or any other spirit-side machinery.
The built-in implementation that uses go-mysql's BinlogSyncer lives in this package and backs the existing Client. Out-of-tree implementations construct their own Source value and pass it to spirit via the Move/Migration config.
Lifecycle: construct → AddSubscription(...)* → Start(ctx) OR StartFromPosition(ctx, pos) → Flush / BlockWait / FlushUnderTableLock as needed → Close().
Events flow PUSH-style: when a row event matching one of the subscribed tables arrives, the source implementation looks up the Subscription whose Tables() includes that (schema, table) and calls sub.HasChanged(key, row, deleted) directly. There is no Next() / Recv() loop on this interface — the caller registers subscriptions and lets the source drive them.
The surface area is intentionally broad to match the existing binlog-backed implementation so all spirit consumers (pkg/migration, pkg/move, pkg/checksum) program against the interface. Resume-time positions are opaque strings (Position / StartFromPosition) so that alternative implementations can encode whatever they need (file+offset, GTID, VStream position, etc.) without leaking to callers.
func NewBinlogClient ¶
func NewBinlogClient(db *sql.DB, host string, username, password string, appl applier.Applier, config *ClientConfig) Source
NewBinlogClient constructs the binlog-backed change.Source. The returned Source talks to MySQL via go-mysql's BinlogSyncer; future alternative sources (e.g. VStream) will live behind their own constructors. config.Applier is required.
func NewGTIDClient ¶
func NewGTIDClient(db *sql.DB, host string, username, password string, appl applier.Applier, config *ClientConfig) Source
NewGTIDClient constructs the GTID-backed change.Source. It mirrors NewBinlogClient: config.Applier (passed via appl) is required.
EXPERIMENTAL. See docs in pkg/migration and pkg/move for the --gtid flag.
type Subscription ¶
type Subscription interface {
HasChanged(key, row []any, deleted bool)
Length() int
// Flush writes the pending changes to the target(s) via the applier.
// When underLock is true, locks carries the table locks the caller is
// holding — one per target server — and the applier executes each
// target's statements under that target's own lock.
Flush(ctx context.Context, underLock bool, locks []*dbconn.TableLock) (allChangesFlushed bool, err error)
Tables() []*table.TableInfo // returns the tables related to the subscription in currentTable, newTable order
// SetWatermarkOptimization toggles both high and low watermark
// optimizations. For non-memory-comparable PKs toggling switches the
// subscription between map mode and queue mode; on such a transition
// it drains the outgoing store via the applier so only one store has
// pending entries at a time. Returns the drain error if any.
SetWatermarkOptimization(ctx context.Context, enabled bool) error
// Close signals that no further events will be delivered. Any HasChanged
// caller currently parked on backpressure (e.g. the bufferedMap soft
// memory limit) is unblocked so the binlog reader goroutine can exit.
// Close does NOT flush; pending changes are discarded along with the
// subscription. It is safe to call more than once.
Close()
}
func NewBufferedSubscription ¶
func NewBufferedSubscription(cfg BufferedSubscriptionConfig) (Subscription, error)
NewBufferedSubscription constructs the default bufferedMap-backed Subscription. It is the public counterpart to binlogClient's internal AddSubscription helper: out-of-tree change.Source implementations (e.g. strata's pkg/vstream) call this from their own AddSubscription to build a Subscription the runner / copier can drive.
The returned Subscription is not yet wired into a registry — the caller is responsible for storing it and routing row events to its HasChanged method. The internal sync.Cond is initialised before return (matching subscriptionRegistry.AddBuffered) so HasChanged / Flush / SetWatermarkOptimization are safe to call immediately.