change

package
v0.15.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2026 License: Apache-2.0 Imports: 23 Imported by: 0

README

Change Source

This package defines change.Source — the abstraction spirit uses to consume a stream of row changes from a source database — and the binlog-backed implementation behind NewBinlogClient. The implementation tracks changes by acting as a MySQL replica; the go-mysql library handles the connection and binary-log parsing, and spirit's role is to manage subscriptions for each table being migrated, deduplicate changes, and coordinate with the copier to avoid redundant work.

The interface is source-agnostic: resume positions are opaque strings, lifecycle is Start / StartFromPosition / Close, and additional implementations (e.g. Vitess VStream) can plug in without touching the applier, the bufferedMap, or the migration runner. See source.go for the full interface.

Each table tracked is represented by a subscription. There is a single subscription type — the buffered map — that stores the full row image from the binlog and applies it through the applier. For non-memory-comparable primary keys it falls back to a FIFO queue internally once the watermark optimization is disabled, but row images are still preserved and the applier path is still used.

Subscription Implementation

Background

Earlier versions of Spirit shipped two subscription types side-by-side: a deltaMap that stored only primary-key hashes (and re-read row state from the source via REPLACE INTO ... SELECT at flush time), and a deltaQueue that preserved binlog order for non-memory-comparable PKs. The split caused issue #746: MySQL's binlog-vs-visibility ordering meant that the deltaMap path could read a stale row image when its SELECT raced ahead of the row's commit visibility, applying the wrong final state.

The fix was to unify everything around a single subscription type — the buffered map — that captures the full row image from the binlog directly, so the applied state is the binlog state and the source-side SELECT race is gone. The deltaMap and deltaQueue types were removed entirely; the FIFO behaviour previously provided by deltaQueue now lives inside bufferedMap as an internal mode for non-memory-comparable PKs (see below).

Buffered Map

The buffered map stores the full row image directly from the binlog and applies it through the applier interface:

How it works:

  • Maintains a map of primaryKeyHash -> (isDelete, fullRowImage).
  • Multiple changes to the same row are automatically deduplicated (only the final state is stored).
  • Uses the applier's UpsertRows and DeleteKeys to write changes — there is no SELECT FROM original round-trip.
  • Flushes changes through the applier's parallel write workers.

Advantages:

  • Excellent deduplication: if a row is modified 100 times, only one upsert is performed.
  • Parallel flushing: independent keys can be written concurrently via the applier.
  • No source-side reads at flush: the row image is already in memory, so no contention with OLTP traffic on the source.
  • Sidesteps the binlog/visibility race: because the row image is the applied state, there is no opportunity for MySQL's binlog-vs-visibility ordering to surface a stale row (see issue #746). This also makes spirit safe to run against sources configured with semi-synchronous replication, which can widen that window by tens or hundreds of milliseconds depending on replica ACK latency. The mysql-semisync-docker.yml CI lane exercises this configuration end-to-end.
  • Watermark optimization (when supported by the chunker): can skip ranges of keys using both KeyAboveHighWatermark and KeyBelowLowWatermark.
  • Cross-server compatibility: the applier can target a different MySQL server, which is what pkg/move relies on.

Limitations:

  • Requires binlog_row_image=FULL and an empty binlog_row_value_options (the applier needs the complete row image).
  • Higher memory usage than a key-only map: stores full row data for each changed key.
  • Watermark optimizations (KeyAboveHighWatermark and KeyBelowLowWatermark) are available on MappedChunker implementations (both optimistic and composite chunkers). They work correctly for numeric, binary, and temporal primary key types. For VARCHAR/TEXT columns with collations, Go's byte-order comparison may differ from MySQL's collation order; any discrepancies are caught by the checksum phase (see issue #479).

Map iteration order is irrelevant because the applier issues REPLACE INTO target VALUES (...), which deletes any row that conflicts on PRIMARY KEY or any UNIQUE index before each insert. That makes the multi-row VALUES list order-independent — see "Applier idempotence via REPLACE INTO" below.

Example scenario:

Binlog events:  INSERT(id=1, ...), UPDATE(id=1, ...), UPDATE(id=1, ...), DELETE(id=2)
Buffered map:   {1: {row: <latest image>}, 2: {isDelete}}
Applied:        UpsertRows({id=1, ...}); DeleteKeys({id=2});
FIFO fallback for non-memory-comparable primary keys

For tables with non-memory-comparable primary keys (e.g. VARCHAR with a case-insensitive collation), the subscription uses LWW buffered-map dedup during the copy phase and switches to an internal FIFO queue post-copy. The queue still stores row images inline and applies them via the applier — there is no REPLACE INTO ... SELECT, so the #746 fix and cross-server move support (issue #607) are preserved. The queue exists only to preserve binlog order: collation-equivalent keys like "A" and "a" hash to different map slots but resolve to the same MySQL row, so a map's non-deterministic iteration would apply events out of order. FIFO replay through the applier preserves binlog order; the target's own collation-aware uniqueness then collapses the events onto the right row.

During the copy phase the chunker's own SELECT covers in-window case-collision races, so LWW map dedup is safe and considerably faster. When the watermark optimization is disabled at the end of the copy phase, SetWatermarkOptimization drains the map inline and the subscription switches into queue mode for the cutover/checksum window. The post-cutover checksum (with FixDifferences=true) repairs any residual divergence.

Memory-comparable PKs always use the buffered map, since map-key equality matches MySQL row identity.

Applier idempotence via REPLACE INTO (#847)

The applier writes a multi-row statement per batch. We use:

REPLACE INTO target (cols) VALUES (...), (...), ...;

rather than INSERT ... ON DUPLICATE KEY UPDATE. The choice matters whenever two rows in the same batch can collide on a unique key — typically because a source-side transaction legally moves a unique value between rows:

-- Legal in source: deactivate one row, then activate another,
-- inside a single transaction. UNIQUE(slot_id) allows NULLs to
-- duplicate, so the invariant holds.
START TRANSACTION;
UPDATE t SET slot_id = NULL  WHERE id = 1;  -- was 'S'
UPDATE t SET slot_id = 'S'   WHERE id = 2;  -- was NULL
COMMIT;

With INSERT ... ON DUPLICATE KEY UPDATE, MySQL processes the multi-row VALUES list in array order and resolves only the first conflict on each row (via the UPDATE clause). If the resulting update introduces a second unique-key collision the statement fails with Error 1062. The map's randomized iteration meant a swap pair could land "activate-first" in the batch, hitting that exact failure.

REPLACE INTO is order-independent for this case. Per the docs:

REPLACE works exactly like INSERT, except that if an old row in the table has the same value as a new row for a PRIMARY KEY or a UNIQUE index, the old row is deleted before the new row is inserted.

So each row's conflicts — on PK or any unique index — are deleted before that row's insert runs, irrespective of where the conflicting row sits in the batch. The swap pair collapses to "delete the previous holder, insert the new holder" and the order of the two events inside the batch doesn't matter.

This is the same robustness the pre-#821 deltaMap had with REPLACE INTO target SELECT FROM source, but without the read-after-commit race that motivated #746 — we supply the inline row image, not a SELECT against source.

Eventual consistency between batches

REPLACE's "delete any unique-key conflict before each insert" semantic means a single REPLACE statement can delete more rows than the ones in its VALUES list — specifically, any row currently in the destination that previously held a unique value the new row is now claiming. That row is briefly missing from the destination until its own event arrives in a later batch (or in the same batch but processed later) and re-inserts it.

Concretely, for the swap pair above with batches of size 1:

Step Batch Destination state
0 id=1: 'S', id=2: NULL
1 REPLACE (id=1, slot=NULL) id=1: NULL, id=2: NULL
2 REPLACE (id=2, slot='S') id=1: NULL, id=2: 'S'

And for the same swap pair if the activate landed first across batches:

Step Batch Destination state
0 id=1: 'S', id=2: NULL
1 REPLACE (id=2, slot='S') id=2: 'S' (id=1 deleted — unique-key conflict on 'S')
2 REPLACE (id=1, slot=NULL) id=1: NULL, id=2: 'S' (id=1 re-inserted)

Binlog ordering gives us the first table in practice — within a single source-side transaction, the deactivate event has a lower binlog position than the activate — but Spirit's correctness does not depend on which case occurs. The destination converges to source's current state once the last unflushed event for each affected PK has been applied.

This eventual consistency is safe because the bufferedMap is an up-to-date and disjoint representation of pending changes: every PK appears at most once at flush time, holding the latest row image MySQL emitted for it. Any row transiently deleted by REPLACE's conflict resolution is therefore guaranteed to have its own event in the buffer (or arriving shortly) — its row image isn't lost, just temporarily not yet applied. The post-cutover checksum (with FixDifferences=true) is the backstop for anything that slips through.

See TestBufferedMapSwapPairFlushesViaReplace (unit) and TestSwapPairEndToEndViaReplace (end-to-end) for the regression gates.

Features

Watermark Optimization

The watermark optimization is a critical performance feature that prevents the replication client from doing redundant work during the copy phase.

The Problem: During the initial copy phase, the copier is reading rows from the source table and writing them to the new table. Meanwhile, the replication client is also receiving binlog events for those same rows. Without optimization, we would:

  1. Copy row with id=1000 from source to target
  2. Receive a binlog event for id=1000 (from before the copy)
  3. Apply the binlog change, overwriting what we just copied
  4. Result: Wasted work and potential deadlocks

The Solution: The copier maintains a "watermark" representing its progress. The replication client uses this watermark to filter changes:

  • High watermark: Skip changes for rows that haven't been copied yet (they'll be picked up by the copier)
  • Low watermark: Skip changes for rows that are currently being copied (avoid races with the copier, which may cause deadlocks/lock waits)
if chunker.KeyAboveHighWatermark(key[0]) {
    return  // Skip, copier will handle this
}

if !chunker.KeyBelowLowWatermark(key[0]) {
    continue  // Skip, copier is actively working on this range
}

Important: The watermark optimization is disabled before the final cutover to ensure all changes are applied regardless of the copier's position.

Checkpointing

The replication client tracks two positions:

  • Buffered position: All events have been read from the server and stored in memory
  • Flushed position: All events have been successfully applied to the target table
// Get the safe checkpoint position (opaque string owned by the source).
pos := client.Position()

// Resume from a checkpoint — primes the position and starts streaming.
err := client.StartFromPosition(ctx, savedPosition)

Periodically, changes are flushed to advance the flushed position, which is then used as part of checkpoints. Because all replication changes are idempotent, it is understood that on recovery some changes will effectively be re-flushed, and the last ~1 minute of progress may have been lost.

Final Cutover coordination

Before a cutover operation can run, it's important to ensure that there are no unapplied replication changes. The best practice way to do this is to first Flush(ctx) without a lock, and then repeat the flush with the lock held. i.e.

// Ensure most changes are up to date before we need to do this again
// with a lock held (ensures lock duration is as short as possible)
err = client.Flush(ctx)

// Acquire table lock
lock, err := dbconn.LockTable(ctx, db, sourceTable)

// Flush all remaining changes under the lock
err = client.FlushUnderTableLock(ctx, lock)

// This check should be redundant, but we verify everything is applied
if !client.AllChangesFlushed() {
    return errors.New("changes still pending")
}

// Safe to cutover now

The client.Flush() will retry in a loop until the number of pending changes is considered trivial (currently <10K). It is important to handle errors correctly here, because FlushUnderTableLock may fail if it can't flush the pending changes fast enough. This is your cue to abandon the cutover operation for now, and try again when the server is under less load.

Memory backpressure

Each subscription approximates the bytes it is holding in memory (row image + key bytes per buffered change) and parks HasChanged on a per-subscription condition variable when the total reaches DefaultSubscriptionSoftLimitBytes (256 MiB). This keeps wide rows — LONGTEXT, BLOB, large JSON — from OOMing the migrator when the source's write rate outpaces the applier.

The cap is soft: the wait is checked before a change is added, against the buffer's current pre-add size. A row is therefore always admitted whenever sizeBytes < softLimitBytes, even if its own size pushes the total well past the limit; the cap only blocks new arrivals once the buffer is already at or over it. This is intentional — it preserves forward progress regardless of row width — but it does mean peak memory can exceed DefaultSubscriptionSoftLimitBytes by up to one oversized row's worth before the next caller parks.

Override via ClientConfig.SubscriptionSoftLimitBytes; pass a negative value to disable the cap entirely. The times_parked_on_soft_limit and size_bytes fields appear in the watermark-toggled log line, and keys_added / keys_dropped_above_high / keys_skipped_not_below_low provide the surrounding context.

Limitation — binlog retention: while parked, the binlog reader makes no progress. If the source rotates past the reader's current position (binlog_expire_logs_seconds) before the buffer drains, the reader will fail to resume and the migration will abort. Tune the soft limit and source retention together for sustained high-write workloads.

Other Minor Features
  • Automatic recovery: Handles transient errors and reconnects to the binlog stream without data loss
  • DDL detection: Monitors for schema changes and notifies the migration coordinator. This is used to abandon any schema changes if the table was externally modified.

See Also

Documentation

Overview

Package change contains binary log subscription functionality.

Index

Constants

View Source
const (

	// DefaultBatchSize is the fixed number of rows in each batched REPLACE/
	// DELETE statement that the binlog applier emits against the _new
	// table. Larger is better, but we need to keep the run-time of the
	// statement well below dbconn.maximumLockTime so that it doesn't
	// prevent copy-row tasks from failing. On Aurora tables with
	// out-of-cache workloads that copy ~300 rows per second this is close
	// to the safe ceiling.
	//
	// Was previously an initial value for an adaptive sizer (feedback()
	// driven by p90 apply time). That mechanism was meaningful when the
	// applier issued `REPLACE INTO _new ... SELECT FROM source` and S-locked
	// rows on the live table, but after #853 the applier emits inline
	// VALUES against _new only — no source-side locks — and the batches
	// are strictly serial inside flushBatch. There's nothing left to
	// throttle, so the batch size is just a constant. See issue #869.
	DefaultBatchSize = 1000

	// DefaultFlushInterval is the time that the client will flush all binlog changes to disk.
	// Longer values require more memory, but permit more merging.
	// I expect we will change this to 1hr-24hr in the future.
	DefaultFlushInterval = 30 * time.Second
	// DefaultSubscriptionSoftLimitBytes caps the approximate memory held
	// per subscription before HasChanged starts blocking on the buffered
	// map's condition variable. The cap is "soft": a single oversized
	// row admitted when the buffer is empty will exceed the limit, and
	// the next caller will park until that row drains. This keeps wide
	// rows (LONGTEXT / BLOB / large JSON) from OOMing the migrator
	// while still guaranteeing forward progress regardless of row width.
	// See pkg/change/subscription_buffered.go for the accounting model.
	//
	// Operators should be aware that pausing the binlog reader for an
	// extended period risks falling past the source's binlog retention
	// (binlog_expire_logs_seconds). Tune this value, or the source's
	// retention, accordingly.
	DefaultSubscriptionSoftLimitBytes = 256 << 20
	// DefaultTimeout is how long BlockWait is supposed to wait before returning errors.
	DefaultTimeout = 30 * time.Second
)

Variables

View Source
var (

	// ErrChangesNotFlushed indicates that not all changes have been flushed from the replication feed.
	ErrChangesNotFlushed = errors.New("not all changes flushed")
)
View Source
var ErrPositionNotFound = errors.New("change.Source: cannot resume from position; it is no longer available on the source")

ErrPositionNotFound is returned by StartFromPosition when the underlying source can no longer resume from the requested opaque position — most commonly because the binlog file has been purged on a MySQL source. Wrapped with %w so callers can errors.Is against it.

Functions

func NewServerID

func NewServerID() uint32

NewServerID generates a unique server ID to avoid conflicts with other binlog readers. Uses crypto/rand combined with an atomic counter to ensure uniqueness even when called concurrently. Returns a value in the range 1001-4294967295 to avoid conflicts with typical MySQL server IDs (0-1000).

Types

type BufferedSubscriptionConfig

type BufferedSubscriptionConfig struct {
	// CurrentTable is the source-side TableInfo. Required.
	CurrentTable *table.TableInfo

	// NewTable is the destination-side TableInfo. May be nil for
	// MoveTables/import flows where source and destination share the
	// same schema; in that case Subscription.Tables() returns
	// [CurrentTable, nil].
	NewTable *table.TableInfo

	// Applier writes batched changes to the target. Required.
	Applier applier.Applier

	// Chunker provides the watermark filter + column mapping. Required.
	Chunker table.MappedChunker

	// Logger receives diagnostic events. Defaults to slog.Default()
	// when nil.
	Logger *slog.Logger

	// SoftLimitBytes is the per-subscription byte cap before
	// HasChanged blocks waiting on the flush path. Zero disables the
	// cap. See bufferedMap.softLimitBytes for the semantics.
	SoftLimitBytes int64
}

BufferedSubscriptionConfig configures NewBufferedSubscription.

type ClientConfig

type ClientConfig struct {
	Logger   *slog.Logger
	ServerID uint32
	DBConfig *dbconn.DBConfig // Database configuration including TLS settings

	// CancelFunc is an optional callback from the caller (e.g. migration or move runner).
	// It is called when a DDL change is detected on a subscribed table, or when a fatal
	// stream error occurs (such as minimal RBR detection or exhausted streamer recreation
	// attempts). The caller is expected to handle cancellation and cleanup.
	// It returns true if the error was acted upon (caller actually cancelled),
	// or false if it was ignored (e.g. because the caller is already past cutover).
	CancelFunc func() bool

	// DDLFilterSchema, when set, broadens DDL detection to cancel on any DDL change
	// in the specified schema, rather than only on exact table matches against subscriptions.
	// This is used by the move runner to detect DDL on any table in the source database.
	DDLFilterSchema string

	// DDLFilterTables, when set alongside DDLFilterSchema, narrows the schema-level
	// DDL detection to only the specified table names. This is used for partial moves
	// where only specific tables from a schema are being moved — DDL on unrelated
	// tables in the same schema should not trigger cancellation.
	// If empty (and DDLFilterSchema is set), all tables in the schema trigger cancellation.
	DDLFilterTables []string

	// SubscriptionSoftLimitBytes overrides DefaultSubscriptionSoftLimitBytes
	// for new subscriptions. Set to a negative value to disable the cap
	// entirely (HasChanged will never block on memory). Zero (the
	// zero-value default) means use DefaultSubscriptionSoftLimitBytes.
	SubscriptionSoftLimitBytes int64
}

func NewClientDefaultConfig

func NewClientDefaultConfig() *ClientConfig

NewClientDefaultConfig returns a default config for the copier.

type Source

type Source interface {
	// AddSubscription constructs a bufferedMap from (currentTable,
	// newTable, chunker) and registers it. ROW events matching the
	// registered (schema, table) pair are pushed to the subscription's
	// HasChanged. Must be called before Start / StartFromPosition.
	AddSubscription(currentTable, newTable *table.TableInfo, chunker table.MappedChunker) error

	// Start begins streaming from the current source head and spawns the
	// reader goroutine. Returns once the reader is running; the stream
	// itself continues until Close is called or ctx is cancelled.
	// Implementations perform any required validation (privileges,
	// connectivity, server settings) before returning.
	Start(ctx context.Context) error

	// StartFromPosition is the resume-time entry point. It primes the
	// source's internal position to the opaque string previously
	// returned by Position(), then begins streaming as if Start had
	// been called. Implementations validate the position is still
	// resumable (e.g. MySQL: the binlog file has not been purged); an
	// unresumable position is returned wrapped with ErrPositionNotFound.
	StartFromPosition(ctx context.Context, pos string) error

	// Position returns the latest safe-to-resume position as an opaque
	// string. The implementation owns the encoding; spirit never parses
	// it. Advances only at transaction commit boundaries. Returns "" if
	// no position has been observed yet, signaling that a fresh Start is
	// required.
	Position() string

	// Flush requests that all registered subscriptions flush their
	// buffered changes to their targets. Blocks until the flush
	// completes or ctx cancels.
	Flush(ctx context.Context) error

	// FlushUnderTableLock is the cutover-time variant of Flush: the
	// caller holds table locks and we drain the in-flight backlog
	// against that quiescent state. locks carries one lock per target
	// server being written to (a single lock for single-target
	// migrations; one per shard for sharded moves) — the applier
	// executes each target's statements under that target's own lock,
	// since LOCK TABLES blocks writes from every other connection.
	FlushUnderTableLock(ctx context.Context, locks []*dbconn.TableLock) error

	// BlockWait blocks until all events received from the underlying
	// stream up to call-time have been delivered to their subscriptions.
	// Used by the runner around cutover to drain the in-flight backlog.
	// Returns when drained or ctx cancels.
	BlockWait(ctx context.Context) error

	// GetDeltaLen returns the total number of pending changes across
	// all registered subscriptions. Used by callers to decide whether
	// the backlog is small enough to consider cutover.
	GetDeltaLen() int

	// SetWatermarkOptimization toggles the high/low watermark
	// optimization across all subscriptions. Disabled before
	// checksum/cutover to ensure all changes are flushed regardless of
	// watermark position.
	SetWatermarkOptimization(ctx context.Context, enabled bool) error

	// StartPeriodicFlush spawns a background goroutine that flushes the
	// changeset at the given interval. Used by the migrator to advance
	// the safe-flushed position. Calling Start while a periodic flush
	// is already running is a no-op.
	StartPeriodicFlush(ctx context.Context, interval time.Duration)

	// StopPeriodicFlush stops the goroutine started by
	// StartPeriodicFlush. Safe to call when no periodic flush is
	// running (no-op).
	StopPeriodicFlush()

	// AllChangesFlushed reports whether the buffered position has been
	// caught up to the flushed position (i.e. no in-flight changes
	// remain). For non-binlog implementations, this is equivalent to
	// "have all received events been applied?".
	AllChangesFlushed() bool

	// Close releases all resources. Safe to call more than once.
	Close()
}

Source is the abstraction spirit uses to consume a stream of row changes from a source database. It exists so spirit's replication pipeline is not pinned to the MySQL binlog protocol — alternative implementations (e.g. Vitess VStream) can plug in without touching the applier, the bufferedMap, or any other spirit-side machinery.

The built-in implementation that uses go-mysql's BinlogSyncer lives in this package and backs the existing Client. Out-of-tree implementations construct their own Source value and pass it to spirit via the Move/Migration config.

Lifecycle: construct → AddSubscription(...)* → Start(ctx) OR StartFromPosition(ctx, pos) → Flush / BlockWait / FlushUnderTableLock as needed → Close().

Events flow PUSH-style: when a row event matching one of the subscribed tables arrives, the source implementation looks up the Subscription whose Tables() includes that (schema, table) and calls sub.HasChanged(key, row, deleted) directly. There is no Next() / Recv() loop on this interface — the caller registers subscriptions and lets the source drive them.

The surface area is intentionally broad to match the existing binlog-backed implementation so all spirit consumers (pkg/migration, pkg/move, pkg/checksum) program against the interface. Resume-time positions are opaque strings (Position / StartFromPosition) so that alternative implementations can encode whatever they need (file+offset, GTID, VStream position, etc.) without leaking to callers.

func NewBinlogClient

func NewBinlogClient(db *sql.DB, host string, username, password string, appl applier.Applier, config *ClientConfig) Source

NewBinlogClient constructs the binlog-backed change.Source. The returned Source talks to MySQL via go-mysql's BinlogSyncer; future alternative sources (e.g. VStream) will live behind their own constructors. config.Applier is required.

func NewGTIDClient

func NewGTIDClient(db *sql.DB, host string, username, password string, appl applier.Applier, config *ClientConfig) Source

NewGTIDClient constructs the GTID-backed change.Source. It mirrors NewBinlogClient: config.Applier (passed via appl) is required.

EXPERIMENTAL. See docs in pkg/migration and pkg/move for the --gtid flag.

type Subscription

type Subscription interface {
	HasChanged(key, row []any, deleted bool)
	Length() int
	// Flush writes the pending changes to the target(s) via the applier.
	// When underLock is true, locks carries the table locks the caller is
	// holding — one per target server — and the applier executes each
	// target's statements under that target's own lock.
	Flush(ctx context.Context, underLock bool, locks []*dbconn.TableLock) (allChangesFlushed bool, err error)
	Tables() []*table.TableInfo // returns the tables related to the subscription in currentTable, newTable order

	// SetWatermarkOptimization toggles both high and low watermark
	// optimizations. For non-memory-comparable PKs toggling switches the
	// subscription between map mode and queue mode; on such a transition
	// it drains the outgoing store via the applier so only one store has
	// pending entries at a time. Returns the drain error if any.
	SetWatermarkOptimization(ctx context.Context, enabled bool) error

	// Close signals that no further events will be delivered. Any HasChanged
	// caller currently parked on backpressure (e.g. the bufferedMap soft
	// memory limit) is unblocked so the binlog reader goroutine can exit.
	// Close does NOT flush; pending changes are discarded along with the
	// subscription. It is safe to call more than once.
	Close()
}

func NewBufferedSubscription

func NewBufferedSubscription(cfg BufferedSubscriptionConfig) (Subscription, error)

NewBufferedSubscription constructs the default bufferedMap-backed Subscription. It is the public counterpart to binlogClient's internal AddSubscription helper: out-of-tree change.Source implementations (e.g. strata's pkg/vstream) call this from their own AddSubscription to build a Subscription the runner / copier can drive.

The returned Subscription is not yet wired into a registry — the caller is responsible for storing it and routing row events to its HasChanged method. The internal sync.Cond is initialised before return (matching subscriptionRegistry.AddBuffered) so HasChanged / Flush / SetWatermarkOptimization are safe to call immediately.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL