datasync

package
v0.15.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2026 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Package datasync implements the `sync` command: a continuous, heterogeneous data sync.

A sync performs an initial copy of the source tables into the target, then streams ongoing changes from the source to the target indefinitely until the context is cancelled. Unlike `move`, there is no checksum and no cutover — the sync simply keeps the target caught up with the source for as long as it runs. Cancelling the command (SIGINT/SIGTERM, or a caller-cancelled context) drains the in-flight backlog and exits cleanly.

The source is either a built-in MySQL binlog client (constructed from SourceDSN) or a caller-injected change.Source — e.g. a Vitess / PlanetScale VStream. The target is written through an applier; today that is a MySQL SingleTargetApplier, but the applier abstraction is what makes the sync heterogeneous: a future Postgres applier would let this sync MySQL → Postgres without changing the runner.

The package is deliberately named datasync (not sync) so it does not shadow the standard library's sync package. The CLI command is still `sync` (the kong field name drives the command name, the same way migration.Migration is exposed as `migrate`).

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Runner

type Runner struct {
	// contains filtered or unexported fields
}

Runner executes a Sync: an initial copy followed by continuous replication that runs until the context is cancelled.

func NewRunner

func NewRunner(s *Sync) (*Runner, error)

NewRunner validates the Sync config and returns a Runner. The CLI supplies defaults via kong; programmatic callers get the same defaults applied here as a safety net.

func (*Runner) Cancel

func (r *Runner) Cancel()

Cancel stops the sync by cancelling the Run context. Safe to call before Run has started (no-op until cancelFunc is set).

func (*Runner) ChecksumReady

func (r *Runner) ChecksumReady() <-chan struct{}

ChecksumReady returns a channel that is closed once the continuous checker has been constructed — that is, when the initial copy and the post-copy flush have completed and runContinuousChecksum has started.

func (*Runner) ChecksumStats

func (r *Runner) ChecksumStats() checksum.ContinuousCheckerStats

ChecksumStats returns a point-in-time snapshot of continuous-checksum counters. Returns the zero value when the checker has not yet been constructed (initial copy still running).

func (*Runner) Close

func (r *Runner) Close() error

Close releases resources. Safe to call once after Run returns. The applier's worker lifecycle is owned by the copier (which stops it after the initial copy), so Close does not stop it. An injected change.Source and injected applier/target are owned by the caller, but Close still calls change.Source.Close (documented idempotent) to release the runner's reference.

func (*Runner) DumpCheckpoint

func (r *Runner) DumpCheckpoint(ctx context.Context) error

DumpCheckpoint records the copier watermark (and, for continuous sync, the change-feed position) on the target so a restart can resume a partial copy and the stream. It is a no-op until the copy pipeline is built.

func (*Runner) FirstCleanPass

func (r *Runner) FirstCleanPass() <-chan struct{}

FirstCleanPass returns a channel that is closed the first time the continuous checksum completes a clean pass — i.e. every chunk was READ-verified equal (on its initial read or via retry) and no chunk needed a recopy. A pass that repaired chunks via recopy does not qualify; the repaired ranges are re-verified by the following pass before the signal can fire. Programmatic callers that gate on "data is known consistent" (e.g. the import feature) should block on this channel.

The accessor is non-blocking: it returns immediately with a channel the caller can wait on. The Runner-owned channel is closed by an internal goroutine once the checker fires its own FirstCleanPass — so it's safe to call before Run, after Run, or from a watchdog.

func (*Runner) Progress

func (r *Runner) Progress() status.Progress

Progress returns a structured snapshot of the sync's current state: the phase, a short human-readable summary, and per-table copy progress during the initial copy.

func (*Runner) Run

func (r *Runner) Run(ctx context.Context) error

Run performs the initial copy and then streams changes continuously until ctx is cancelled. A clean cancellation returns nil; a fatal source event (e.g. DDL) returns an error.

func (*Runner) SetLogger

func (r *Runner) SetLogger(logger *slog.Logger)

SetLogger overrides the logger (used by programmatic callers to capture progress output).

func (*Runner) Status

func (r *Runner) Status() string

Status returns a one-line, human-readable status for logging. It does not log itself; status.WatchTask (when used) logs the returned value.

type Sync

type Sync struct {
	SourceDSN       string        `name:"source-dsn" help:"Where to sync the tables from." default:"spirit:spirit@tcp(127.0.0.1:3306)/src"`
	TargetDSN       string        `name:"target-dsn" help:"Where to sync the tables to." default:"spirit:spirit@tcp(127.0.0.1:3306)/dest"`
	TargetChunkTime time.Duration `name:"target-chunk-time" help:"How long each copy chunk should take." default:"5s"`
	Threads         int           `name:"threads" help:"How many chunks to copy in parallel during the initial copy." default:"4"`
	WriteThreads    int           `name:"write-threads" help:"How many concurrent write threads to use on the target." default:"4"`
	// FlushInterval controls how often buffered changes are applied to the
	// target during continuous sync — i.e. the replication latency vs.
	// batching trade-off. Defaults to change.DefaultFlushInterval.
	FlushInterval time.Duration `name:"flush-interval" help:"How often to flush buffered changes to the target during continuous sync." default:"30s"`

	// CopyOnly performs only the initial copy and then returns — no change
	// capture and no continuous replication, so no change.Source is
	// constructed or required. Useful for a one-shot snapshot, or when the
	// source cannot provide a change feed (e.g. a managed Vitess without
	// binlog/VStream access, or a replica lacking the REPLICATION privileges
	// the built-in binlog client needs). A final checkpoint is still written,
	// so a later run resumes the copy from there (or no-ops if it had already
	// completed) rather than starting over.
	CopyOnly bool `name:"copy-only" help:"Only run the initial copy, then exit (no continuous change capture)." default:"false"`

	// Force, when set, makes the runner drop and recreate the target database
	// at startup *unless* a resumable checkpoint exists — i.e. it only nukes
	// the target when the copy could not have resumed anyway. A resumable
	// run (checkpoint present) is left intact and resumes as normal. Intended
	// for testing/iterating, where a previous partial run can leave the target
	// non-empty with no usable checkpoint, otherwise tripping the fresh-sync
	// target-empty guard.
	Force bool `name:"force" help:"Drop and recreate the target database when the copy cannot resume from a checkpoint." default:"false"`

	// GTID switches the built-in change source from binlog file+position to
	// MySQL GTIDs. EXPERIMENTAL — see pkg/change/gtid.go. Ignored when a
	// pre-constructed Source is injected. Requires gtid_mode=ON and
	// enforce_gtid_consistency=ON on the source.
	GTID bool `name:"gtid" help:"EXPERIMENTAL: use GTID-based change source instead of binlog file+position" default:"false"`

	// Source optionally provides a pre-constructed change.Source to use
	// for replication instead of constructing a built-in MySQL-binlog
	// client from SourceDSN. When set, the runner uses this as the change
	// feed. SourceDSN is still required for source-side SQL (SHOW TABLES,
	// SHOW CREATE TABLE, the initial-copy SELECTs). Setting Source requires
	// setting Applier (see below).
	//
	// Intended for callers (e.g. strata's Vitess/PlanetScale import) that
	// need a non-MySQL-binlog change source.
	Source change.Source `kong:"-"`

	// Applier optionally provides a pre-constructed applier.Applier. When
	// set, the runner uses this instead of constructing a MySQL
	// SingleTargetApplier from the target. Required when Source is set: the
	// injected change.Source needs the same applier instance the copier
	// uses, so all writes flow through one logical apply path.
	Applier applier.Applier `kong:"-"`

	// Target optionally supplies a pre-opened target. When nil, a single
	// MySQL target is opened from TargetDSN (auto-creating the database if
	// it does not exist). Sync writes to exactly one logical target — there
	// is no N:M fan-out.
	Target *applier.Target `kong:"-"`
}

Sync is the configuration for a continuous data sync. The exported, kong-tagged fields are the CLI surface; the kong:"-" fields are for programmatic callers (e.g. strata's Vitess/PlanetScale import) that inject a non-MySQL change source and/or a custom applier.

func (*Sync) Run

func (s *Sync) Run() error

Run is the kong CLI entry point. It runs the sync until the process receives SIGINT/SIGTERM, then drains and exits cleanly. Programmatic callers that want to supply their own context should construct a Runner via NewRunner and call Runner.Run directly.

Signal handling is two-stage: the first SIGINT/SIGTERM cancels the context for a graceful drain, and a second forces an immediate exit. The force-quit matters because the change feed can be slow to unwind (e.g. a busy or repeatedly-reconnecting binlog stream), and without it a second Ctrl+C would be swallowed — making the command feel hung.

func (*Sync) Validate

func (s *Sync) Validate() error

Validate is called by Kong after parsing to check for invalid flag values. Zero values mean "use the default" (NewRunner fills them in), so they are not rejected here; only explicitly-negative or otherwise invalid values are caught. Mirrors migration.Migration.Validate.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL