ghostferry

package module
v0.0.0-...-7dd3c68 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2021 License: MIT Imports: 44 Imported by: 0

README

Ghostferry

Ghostferry is a library that enables you to selectively copy data from one mysql instance to another with minimal amount of downtime.

It is inspired by Github's gh-ost, although instead of copying data from and to the same database, Ghostferry copies data from one database to another and have the ability to only partially copy data.

There is an example application called ghostferry-copydb included (under the copydb directory) that demonstrates this library by copying an entire database from one machine to another.

Talk to us on IRC at irc.freenode.net #ghostferry.

Stable Versus Experimental

This project is an experimental fork of the official ghostferry project. We add various features and fixes that have not (yet) made it into the upstream version.

While the changes added in this fork are considered experimental and must only be used at your own risk, we use them on a daily basis and we consider them stable for use in practice. Still, use at your own risk and opt for the original version if their features satisfy your needs.

If you discover bugs in any feature added in this fork, please open an issue ticket with the details. If the issue is within the base system, open an issue on the original project - we constantly monitor stable changes there and incorporate them into this fork.

Features/fixes added in this fork include

  • fix data corruption for binary columns: this fix has not made it into upstream master yet.
  • fix failure to resume: this fix has not made it into upstream master yet.
  • allow specifying table creation order: Ghostferry does not allow copying tables using foreign key constraints, because it copies data in batches, which is likely to violate constraints, leading to failures during the copy phase.
    The forked version allows specifying the order in which tables need to be created in the target database. This allows working around constraints in the setup phase. Additionally disabling foreign key constraint enforcement on the target database session/connection allows working around constraints during the copy phase.
    Note that an incomplete execution of Ghostferry will leave the database in an inconsistent state until the copy is resumed and completed.
  • improved handling of foreign key constraints: support infer the table creation order automatically if the database contains foreign key constraints and no manual order of tables is specified in the configuration.
    Note that this merely automates part of the setup needed for supporting foreign key constraints. There are still several limitations in place for migrating such databases and the feature must be used with great care. Especially the use of database- or table-rewrites may introduce invalid target database states that are not recoverable.
  • use DBReadRetries configuration setting also for retrying reading from the binlog server (instead of using a hardcoded retry limit of 5).
  • more robust handling of bigint column values: this fix has not made it into upstream master yet.
  • support writing resume/state data to file instead of using stdout.
  • support writing resume/state data to target database.
  • support copying tables without "paging" primary keys: Ghostferry requires integer auto-increment primary keys for copying data. An optional feature in this fork allows marking tables for "full copy", allowing to copy tables that do not meet this primary key requirement. It is strongly recommended to use this feature with care and only on tables with few rows, as the copy process requires locking the entire table on the source database.
  • support schema modifications during the cutover phase: unlike the original version of Ghostferry, which ignores any DDL events (such as CREATE/ALTER/DELETE TABLE or TRUNCATE TABLE statements), this fork propagates such events from the source to the target database.
    Note that there are a few restrictions/limitations with this:
    • schema changes occurring during the copy phase cannot be handled if the columns of a copy-in-progress table change. There are work-arounds in place to recover the copy process after a restart though (see DelayDataIterationUntilBinlogWriterShutdown and FailOnFirstTableCopyError).
    • data integrity verification is not supported once a schema is changed, as its current implementation assumes on generating hashes/fingerprints of data/table rows. As soon as schemas are modified, it is currently not possible to generate fingerprints that are consistent across different schemas on source and target DB.
    • database/table name rewrites are not supported, as we would need non- trivial rewrites of schema changing statements when tables are altered, renamed, created, or deleted.
    • GRANT and REVOKE statements are ignored. These are not part of the schema per-se, but it is still worth pointing out.
    • CREATE PROCEDURE, CREATE FUNCTION, DROP PROCEDURE, and , DROP FUNCTION statements are currently not supported and are ignored as part of replication. Likewise, functions or procedures defined on the source before replication is started are not copied to the target DB.
  • support reading from (read-only) DB replica: allow using locks within Ghostferry (instead of on the source database) to avoid race conditions between the data copy and binlog replication. The upstream version of Ghostferry locks tables on the source, potentially interferring with the application. Furthermore, by using locks outside the DB, it is possible to replicate from read-only sources where locking is not an option (e.g., cloud-SQL).
  • support non-int primary keys: extend Ghostferry to support signed/unsigned integers, string, binary/varbinary, and composite primary keys (that use ints and strings). This vastly reduces the types of tables for which "full copy" is required. Additionally, support iterating over table rows to copy in descending order. This allows reading recent data first if the pagination key reflects chronological data.
    Note that there are a few restrictions/limitations with this:
    • support for row verification has not yet been implemented at this point. Enabling inline/iterative verifiers causes an error at runtime if data is requested to be verified for an incompatible table.
    • because we support signed integer primary keys now, the maximum key value supported is now 263 (previously 264). In practice it is unlikely to have DB key values of this size, and it is thus not configurable to provide the legacy behavior.
  • more robust disabling of inline-verifier: this fix has not made it into upstream master yet.
  • support throttling of data migration separately from replication. This allows prioritizing the data replication over the copy of old data (or vice-versa).
  • support exposing binlog writer state as part of the status portal as well as a new /api/health HTTP endpoint returning status as JSON. This endpoint can also be used for Kubernetes lifeness probes by specifying an allowed maximum value for the state age, returning HTTP-500 if the maximum has been exceeded.

Overview of How it Works

An overview of Ghostferry's high-level design is expressed in the TLA+ specification, under the tlaplus directory. It maybe good to consult with that as it has a concise definition. However, the specification might not be entirely correct as proofs remain elusive.

On a high-level, Ghostferry is broken into several components, enabling it to copy data. This is documented at https://shopify.github.io/ghostferry/master/technicaloverview.html

Development Setup

Install:

  • Have Docker installed
  • Clone the repo
  • docker-compose up -d mysql-1 mysql-2

Run tests:

  • make test

Test copydb:

  • make copydb && ghostferry-copydb -verbose examples/copydb/conf.json
  • For a more detailed tutorial, see the documentation.

Ruby Integration Tests

Kindly take note of following options:

Example: bundle exec rake test DEBUG=1 TESTOPTS="-v --name=TrivialIntegrationTests#test_logged_query_omits_columns"

Documentation

Index

Constants

View Source
const (
	VerifierTypeChecksumTable  = "ChecksumTable"
	VerifierTypeIterative      = "Iterative"
	VerifierTypeInline         = "Inline"
	VerifierTypeNoVerification = "NoVerification"

	LockStrategySourceDB     = "LockOnSourceDB"
	LockStrategyInGhostferry = "LockInGhostferry"
	LockStrategyNone         = "None"
)
View Source
const (
	StateStarting            = "starting"
	StateCopying             = "copying"
	StateWaitingForCutover   = "wait-for-cutover"
	StateVerifyBeforeCutover = "verify-before-cutover"
	StateCutover             = "cutover"
	StateDone                = "done"

	// useful only for debugging during development - way too verbose for debug
	// logging in production
	// NOTE: This may log confidential data - don't ever use for production data
	IncrediblyVerboseLogging = false
)
View Source
const (
	TableActionWaiting   = "waiting"
	TableActionCopying   = "copying"
	TableActionCompleted = "completed"
)
View Source
const (
	// CompressionSnappy is used to identify Snappy (https://google.github.io/snappy/) compressed column data
	CompressionSnappy = "SNAPPY"
)

Variables

View Source
var (
	VersionString string = "?.?.?+??????????????+???????"
	WebUiBasedir  string = ""
)

Functions

func CheckDbIsAReplica

func CheckDbIsAReplica(db *sql.DB) (bool, error)

func DefaultBuildSelect

func DefaultBuildSelect(columns []string, table *TableSchema, lastPaginationKey *PaginationKeyData, batchSize uint64, sortDescending bool) (squirrel.SelectBuilder, error)

func GetMd5HashesSql

func GetMd5HashesSql(schema, table, paginationKeyColumn string, columns []schema.TableColumn, paginationKeys []uint64) (string, []interface{}, error)

func Int64Value

func Int64Value(value interface{}) (int64, bool)

func MaskedDSN

func MaskedDSN(c *mysql.Config) string

func NewMysqlPosition

func NewMysqlPosition(file string, position uint32, err error) (mysql.Position, error)

func NewStateTrackerFromTargetDB

func NewStateTrackerFromTargetDB(f *Ferry) (s *StateTracker, state *SerializableState, err error)

func NonExistingPaginationKeyColumnError

func NonExistingPaginationKeyColumnError(schema, table, paginationKey string) error

NonExistingPaginationKeyColumnError exported to facilitate black box testing

func NonExistingPaginationKeyError

func NonExistingPaginationKeyError(schema, table string) error

NonExistingPaginationKeyError exported to facilitate black box testing

func QuotedDatabaseNameFromString

func QuotedDatabaseNameFromString(database string) string

func QuotedTableName

func QuotedTableName(table *TableSchema) string

func QuotedTableNameFromString

func QuotedTableNameFromString(database, table string) string

func ScanByteRow

func ScanByteRow(rows *sqlorig.Rows, columnCount int) ([][]byte, error)

func ShowMasterStatusBinlogPosition

func ShowMasterStatusBinlogPosition(db *sql.DB) (mysql.Position, error)

func Uint64Value

func Uint64Value(value interface{}) (uint64, bool)

func UnsupportedPaginationKeyError

func UnsupportedPaginationKeyError(schema, table, paginationKey string) error

UnsupportedPaginationKeyError exported to facilitate black box testing

func WaitForThrottle

func WaitForThrottle(t Throttler)

func WithRetries

func WithRetries(maxRetries int, sleep time.Duration, logger *logrus.Entry, verb string, f func() error) (err error)

func WithRetriesContext

func WithRetriesContext(ctx context.Context, maxRetries int, sleep time.Duration, logger *logrus.Entry, verb string, f func() error) (err error)

Types

type AtomicBoolean

type AtomicBoolean int32

func (*AtomicBoolean) Get

func (a *AtomicBoolean) Get() bool

func (*AtomicBoolean) Set

func (a *AtomicBoolean) Set(b bool)

type BatchWriter

type BatchWriter struct {
	DB             *sql.DB
	InlineVerifier *InlineVerifier
	StateTracker   *StateTracker

	DatabaseRewrites map[string]string
	TableRewrites    map[string]string

	WriteRetries int
	// contains filtered or unexported fields
}

func (*BatchWriter) Initialize

func (w *BatchWriter) Initialize()

func (*BatchWriter) WriteRowBatch

func (w *BatchWriter) WriteRowBatch(batch RowBatch) error

type BatchWriterVerificationFailed

type BatchWriterVerificationFailed struct {
	// contains filtered or unexported fields
}

func (BatchWriterVerificationFailed) Error

type BinlogDeleteEvent

type BinlogDeleteEvent struct {
	*DMLEventBase
	// contains filtered or unexported fields
}

func (*BinlogDeleteEvent) AsSQLString

func (e *BinlogDeleteEvent) AsSQLString(schemaName, tableName string) (string, error)

func (*BinlogDeleteEvent) NewValues

func (e *BinlogDeleteEvent) NewValues() RowData

func (*BinlogDeleteEvent) OldValues

func (e *BinlogDeleteEvent) OldValues() RowData

func (*BinlogDeleteEvent) VerifierPaginationKey

func (e *BinlogDeleteEvent) VerifierPaginationKey() (uint64, error)

type BinlogInsertEvent

type BinlogInsertEvent struct {
	*DMLEventBase
	// contains filtered or unexported fields
}

func (*BinlogInsertEvent) AsSQLString

func (e *BinlogInsertEvent) AsSQLString(schemaName, tableName string) (string, error)

func (*BinlogInsertEvent) NewValues

func (e *BinlogInsertEvent) NewValues() RowData

func (*BinlogInsertEvent) OldValues

func (e *BinlogInsertEvent) OldValues() RowData

func (*BinlogInsertEvent) VerifierPaginationKey

func (e *BinlogInsertEvent) VerifierPaginationKey() (uint64, error)

type BinlogPosition

type BinlogPosition struct {
	// A binlog position emitted by the binlog-streamer consists of two parts:
	// First, the last emitted event position, which refers to the event that
	// we received from the MySQL master and that we hand to clients. Second,
	// a position from which we can resume a binlog-streamer.
	// Ideally, these two values would be the same, but in reality they are
	// not, because some events are streamed in a series (e.g. DML events
	// require a table-map events to be seen before).
	// As a result, we always stream event positions as a pair - if a binlog
	// streamer is resumed from an event that is not safe to resume from, we
	// resume from the most recent (earlier) event from which we can safely
	// resume and simply suppress emitting these events up to the point of the
	// last event returned.
	//
	// the actual binlog position of an event emitted by the streamer
	EventPosition mysql.Position
	// the position from which one needs to point the streamer if we want to
	// resume from after this event
	ResumePosition mysql.Position
}

func NewResumableBinlogPosition

func NewResumableBinlogPosition(pos mysql.Position) BinlogPosition

func (BinlogPosition) Compare

func (p BinlogPosition) Compare(o BinlogPosition) int

func (BinlogPosition) String

func (b BinlogPosition) String() string

type BinlogSchemaChangeEvent

type BinlogSchemaChangeEvent struct {
	*DXLEventBase
	// contains filtered or unexported fields
}

func (*BinlogSchemaChangeEvent) AsSQLString

func (e *BinlogSchemaChangeEvent) AsSQLString(schemaName, tableName string) (string, error)

func (*BinlogSchemaChangeEvent) Database

func (e *BinlogSchemaChangeEvent) Database() string

func (*BinlogSchemaChangeEvent) IsAutoTransaction

func (e *BinlogSchemaChangeEvent) IsAutoTransaction() bool

func (*BinlogSchemaChangeEvent) SqlCommand

func (e *BinlogSchemaChangeEvent) SqlCommand() string

func (*BinlogSchemaChangeEvent) Table

func (e *BinlogSchemaChangeEvent) Table() string

type BinlogStreamer

type BinlogStreamer struct {
	DB           *sql.DB
	DBConfig     *DatabaseConfig
	MyServerId   uint32
	ErrorHandler ErrorHandler
	ReadRetries  int
	// contains filtered or unexported fields
}

func (*BinlogStreamer) AddEventListener

func (s *BinlogStreamer) AddEventListener(listener func(*ReplicationEvent) error)

func (*BinlogStreamer) ConnectBinlogStreamerToMysql

func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() (BinlogPosition, error)

func (*BinlogStreamer) ConnectBinlogStreamerToMysqlFrom

func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPosition BinlogPosition) (BinlogPosition, error)

func (*BinlogStreamer) FlushAndStop

func (s *BinlogStreamer) FlushAndStop()

func (*BinlogStreamer) GetLastStreamedBinlogPosition

func (s *BinlogStreamer) GetLastStreamedBinlogPosition() mysql.Position

func (*BinlogStreamer) IsAlmostCaughtUp

func (s *BinlogStreamer) IsAlmostCaughtUp() bool

func (*BinlogStreamer) Run

func (s *BinlogStreamer) Run()

type BinlogUpdateEvent

type BinlogUpdateEvent struct {
	*DMLEventBase
	// contains filtered or unexported fields
}

func (*BinlogUpdateEvent) AsSQLString

func (e *BinlogUpdateEvent) AsSQLString(schemaName, tableName string) (string, error)

func (*BinlogUpdateEvent) NewValues

func (e *BinlogUpdateEvent) NewValues() RowData

func (*BinlogUpdateEvent) OldValues

func (e *BinlogUpdateEvent) OldValues() RowData

func (*BinlogUpdateEvent) VerifierPaginationKey

func (e *BinlogUpdateEvent) VerifierPaginationKey() (uint64, error)

type BinlogVerifyBatch

type BinlogVerifyBatch struct {
	SchemaName     string
	TableName      string
	PaginationKeys []uint64
}

type BinlogVerifySerializedStore

type BinlogVerifySerializedStore map[string]map[string]map[uint64]int

func (BinlogVerifySerializedStore) Copy

func (BinlogVerifySerializedStore) RowCount

func (s BinlogVerifySerializedStore) RowCount() uint64

type BinlogVerifyStore

type BinlogVerifyStore struct {
	EmitLogPerRowsAdded uint64
	// contains filtered or unexported fields
}

This struct is very similar to ReverifyStore, but it is more optimized for serialization into JSON.

TODO: remove IterativeVerifier and remove this comment.

func NewBinlogVerifyStore

func NewBinlogVerifyStore() *BinlogVerifyStore

func NewBinlogVerifyStoreFromSerialized

func NewBinlogVerifyStoreFromSerialized(serialized BinlogVerifySerializedStore) *BinlogVerifyStore

func (*BinlogVerifyStore) Add

func (s *BinlogVerifyStore) Add(table *TableSchema, paginationKey uint64)

func (*BinlogVerifyStore) Batches

func (s *BinlogVerifyStore) Batches(batchsize int) []BinlogVerifyBatch

func (*BinlogVerifyStore) RemoveVerifiedBatch

func (s *BinlogVerifyStore) RemoveVerifiedBatch(batch BinlogVerifyBatch)

func (*BinlogVerifyStore) Serialize

type BinlogWriter

type BinlogWriter struct {
	DB               *sql.DB
	DatabaseRewrites map[string]string
	TableRewrites    map[string]string
	Throttler        Throttler

	BatchSize          int
	WriteRetries       int
	ApplySchemaChanges bool
	LockStrategy       string

	ErrorHandler                ErrorHandler
	StateTracker                *StateTracker
	ForceResumeStateUpdatesToDB bool

	CopyFilter  CopyFilter
	TableFilter TableFilter
	TableSchema TableSchemaCache
	// contains filtered or unexported fields
}

func NewBinlogWriter

func NewBinlogWriter(f *Ferry) *BinlogWriter

func (*BinlogWriter) BufferBinlogEvents

func (b *BinlogWriter) BufferBinlogEvents(event *ReplicationEvent) error

func (*BinlogWriter) GetWriterState

func (b *BinlogWriter) GetWriterState() (state BinlogWriterState, stateTS time.Time)

func (*BinlogWriter) MarkTableAsCopied

func (b *BinlogWriter) MarkTableAsCopied(table *QualifiedTableName) error

func (*BinlogWriter) ReloadTableSchema

func (b *BinlogWriter) ReloadTableSchema(table *QualifiedTableName) error

func (*BinlogWriter) Run

func (b *BinlogWriter) Run()

func (*BinlogWriter) Stop

func (b *BinlogWriter) Stop()

type BinlogWriterState

type BinlogWriterState string
const (
	WriterStateInit             BinlogWriterState = "Init"
	WriterStateWaitingForEvents BinlogWriterState = "WaitingForEvents"
	WriterStateProcessingEvents BinlogWriterState = "ProcessingEvents"
	WriterStateApplyingEvents   BinlogWriterState = "ApplyingEvents"
	WriterStateAppliedEvents    BinlogWriterState = "AppliedEvents"
)

type CascadingPaginationColumnConfig

type CascadingPaginationColumnConfig struct {
	// PerTable has greatest specificity and takes precedence over the other options
	FullTableCopies map[string][]string // SchemaName => TableNames

	// PerTable has next greatest specificity and takes precedence over the other options
	PerTable map[string]map[string]string // SchemaName => TableName => ColumnName

	// FallbackColumn is a global default to fallback to and is less specific than the
	// default, which is the Primary Key
	FallbackColumn string
}

CascadingPaginationColumnConfig to configure pagination columns to be used. The term `Cascading` to denote that greater specificity takes precedence.

func (*CascadingPaginationColumnConfig) FallbackPaginationColumnName

func (c *CascadingPaginationColumnConfig) FallbackPaginationColumnName() (string, bool)

FallbackPaginationColumnName retreives the column name specified as a fallback when the Primary Key isn't suitable for pagination

func (*CascadingPaginationColumnConfig) IsFullCopyTable

func (c *CascadingPaginationColumnConfig) IsFullCopyTable(schemaName, tableName string) bool

func (*CascadingPaginationColumnConfig) PaginationColumnFor

func (c *CascadingPaginationColumnConfig) PaginationColumnFor(schemaName, tableName string) (string, bool)

PaginationColumnFor is a helper function to retrieve the column name to paginate by

type ChecksumTableVerifier

type ChecksumTableVerifier struct {
	Tables           []*TableSchema
	DatabaseRewrites map[string]string
	TableRewrites    map[string]string
	SourceDB         *sql.DB
	TargetDB         *sql.DB
	// contains filtered or unexported fields
}

func (*ChecksumTableVerifier) Result

func (*ChecksumTableVerifier) StartInBackground

func (v *ChecksumTableVerifier) StartInBackground() error

func (*ChecksumTableVerifier) VerifyBeforeCutover

func (v *ChecksumTableVerifier) VerifyBeforeCutover() error

func (*ChecksumTableVerifier) VerifyDuringCutover

func (v *ChecksumTableVerifier) VerifyDuringCutover() (VerificationResult, error)

func (*ChecksumTableVerifier) Wait

func (v *ChecksumTableVerifier) Wait()

type ColumnCompressionConfig

type ColumnCompressionConfig map[string]map[string]map[string]string

SchemaName => TableName => ColumnName => CompressionAlgorithm Example: blog1 => articles => body => snappy

(SELECT body FROM blog1.articles => returns compressed blob)

func (ColumnCompressionConfig) CompressedColumnsFor

func (c ColumnCompressionConfig) CompressedColumnsFor(schemaName, tableName string) map[string]string

type ColumnIgnoreConfig

type ColumnIgnoreConfig map[string]map[string]map[string]struct{}

SchemaName => TableName => ColumnName => struct{}{} These columns will be ignored during InlineVerification

func (ColumnIgnoreConfig) IgnoredColumnsFor

func (c ColumnIgnoreConfig) IgnoredColumnsFor(schemaName, tableName string) map[string]struct{}

type CompressionVerifier

type CompressionVerifier struct {
	// contains filtered or unexported fields
}

CompressionVerifier provides support for verifying the payload of compressed columns that may have different hashes for the same data by first decompressing the compressed data before fingerprinting

func NewCompressionVerifier

func NewCompressionVerifier(tableColumnCompressions TableColumnCompressionConfig) (*CompressionVerifier, error)

NewCompressionVerifier first checks the map for supported compression algorithms before initializing and returning the initialized instance.

func (*CompressionVerifier) Decompress

func (c *CompressionVerifier) Decompress(table, column, algorithm string, compressed []byte) ([]byte, error)

Decompress will apply the configured decompression algorithm to the configured columns data

func (*CompressionVerifier) GetCompressedHashes

func (c *CompressionVerifier) GetCompressedHashes(db *sql.DB, schema, table, paginationKeyColumn string, columns []schema.TableColumn, paginationKeys []uint64) (map[uint64][]byte, error)

GetCompressedHashes compares the source data with the target data to ensure the integrity of the data being copied.

The GetCompressedHashes method checks if the existing table contains compressed data and will apply the decompression algorithm to the applicable columns if necessary. After the columns are decompressed, the hashes of the data are used to verify equality

func (*CompressionVerifier) HashRow

func (c *CompressionVerifier) HashRow(decompressedRowData [][]byte) ([]byte, error)

HashRow will fingerprint the non-primary columns of the row to verify data equality

func (*CompressionVerifier) IsCompressedTable

func (c *CompressionVerifier) IsCompressedTable(table string) bool

IsCompressedTable will identify whether or not a table is compressed

type Config

type Config struct {
	// Source database connection configuration
	//
	// Required
	Source *DatabaseConfig

	// Target database connection configuration
	//
	// Required
	Target *DatabaseConfig

	// Map database name on the source database (key of the map) to a
	// different name on the target database (value of the associated key).
	// This allows one to move data and change the database name in the
	// process.
	//
	// Optional: defaults to empty map/no rewrites
	DatabaseRewrites map[string]string

	// Map the table name on the source database to a different name on
	// the target database. See DatabaseRewrite.
	//
	// Optional: defaults to empty map/no rewrites
	TableRewrites map[string]string

	// The maximum number of retries for writes if the writes failed on
	// the target database.
	//
	// Optional: defaults to 5.
	DBWriteRetries int

	// Filter out the databases/tables when detecting the source databases
	// and tables.
	//
	// Required
	TableFilter TableFilter

	// Filter out unwanted data/events from being copied.
	//
	// Optional: defaults to nil/no filter.
	CopyFilter CopyFilter

	// The server id used by Ghostferry to connect to MySQL as a replication
	// slave. This id must be unique on the MySQL server. If 0 is specified,
	// a random id will be generated upon connecting to the MySQL server.
	//
	// Optional: defaults to an automatically generated one
	MyServerId uint32

	// The maximum number of binlog events to write at once. Note this is a
	// maximum: if there are not a lot of binlog events, they will be written
	// one at a time such the binlog streamer lag is as low as possible. This
	// batch size will only be hit if there is a log of binlog at the same time.
	//
	// Optional: defaults to 100
	BinlogEventBatchSize int

	// The batch size used to iterate the data during data copy. This batch size
	// is always used: if this is specified to be 100, 100 rows will be copied
	// per iteration.
	//
	// With the current implementation of Ghostferry, we need to lock the rows
	// we select. This means, the larger this number is, the longer we need to
	// hold this lock. On the flip side, the smaller this number is, the slower
	// the copy will likely be.
	//
	// Optional: defaults to 200
	DataIterationBatchSize uint64

	// The maximum number of retries for reads if the reads fail on the source
	// database.
	//
	// Optional: defaults to 5
	DBReadRetries int

	// This specify the number of concurrent goroutines, each iterating over
	// a single table.
	//
	// At this point in time, parallelize iteration within a single table. This
	// may be possible to add to the future.
	//
	// Optional: defaults to 4
	DataIterationConcurrency int

	// If set to true, copy data by paginating in reverse order of the
	// pagination key.
	//
	// This can be useful if the pagination key is indicative of the time of
	// insert (higher key values indicate more recent data, such as auto-
	// increment IDs) and the target should receive more recent data before
	// older data.
	//
	// Optional: defaults to false
	IterateInDescendingOrder bool

	// This specifies if the data-iteration/copy should be delayed until the
	// binlog-writer shuts down. This is needed if one wishes to let replication
	// take care of temporary issues that may be preventing the data copy.
	//
	// For instance, if the source DB has schema changes that are not yet
	// applied on the target DB, copy will fail. Setting this to true allows
	// giving the binlog-writer time to catch up to then resume the copy at a
	// later time.
	//
	// This is only meaningful if we can expect an external trigger for shutting
	// down ghostferry at some point, as an automatic cutover will only happen
	// once the data copy is done (which would never happen if the binlog
	// writing is not interrupted).
	//
	// Optional: defaults to false
	DelayDataIterationUntilBinlogWriterShutdown bool

	// This specifies if the data-iteration should temporarily delay failing on
	// copy errors until all tables have at least been attempted to be copied.
	// Errors are still raised, we simply give the copy the opportunity to copy
	// all tables. Since the order of copy of tables is non-deterministic, this
	// simply ensures tables copied without errors are treated as if they were
	// copied first.
	//
	// Optional: defaults to false
	FailOnFirstTableCopyError bool

	// This specifies if Ghostferry will pause before cutover or not.
	//
	// Optional: defaults to false
	AutomaticCutover bool

	// This specifies if Ghostferry perform cutover at all or not
	//
	// Optional: defaults to false
	DisableCutover bool

	// If true, parse and propagate DB schema changes from the source
	// to the target. This is currently in alpha and does not support
	// all the features of ghostferry, such as
	// - table filters (we don't know if a newly created table should
	//   be generated), or
	// - database/table name rewrites
	//
	// Optional: defaults to false
	ReplicateSchemaChanges bool

	// For migrating data, it is crucial that we're either reading from a master
	// or from a slave that is up-to-date with its master. If we are just
	// continuously replicating/streaming data, it's OK to work on an outdated
	// upstream
	//
	// Optional: defaults to false
	AllowReplicationFromReplica bool

	// This specifies how to prevent races between the data copy and binlog
	// streaming. Possible values are:
	// - LockOnSourceDB: obtain a table lock on the source table while copying
	//   data, which will prevent any type of data modification on the source
	//   DB; this is the strictest method but may intervene with the
	//   application trying to insert data,
	// - LockInGhostferry: obtain a lock in ghostferry, preventing updates to
	//   the target DB while copying data; this should be sufficient in most
	//   scenarios, and
	// - None: do not perform locking, assume the application does not update
	//   or delete data in a way that races may occur.
	//
	// Optional: defaults to "LockOnSourceDB"
	LockStrategy string

	// This specifies whether or not Ferry.Run will handle SIGINT and SIGTERM
	// by dumping the current state to stdout and the error HTTP callback.
	// The dumped state can be used to resume Ghostferry.
	DumpStateOnSignal bool
	// When dumping state is enabled, the file to which to write the state. If
	// this is not set, use stdout.
	//
	// NOTE: Writing state to disk (rather than stdout) has the benefit that
	// ghostferry can control when is a good time to write state and when to
	// leave a previously existing state file intact
	StateFilename string

	// Config for the ControlServer
	ServerBindAddr string
	WebBasedir     string

	// Report progress via an HTTP callback. The Payload field of the callback
	// will be sent to the server as the CustomPayload field in the Progress
	// struct The unit of ProgressReportFrequency is in milliseconds.
	ProgressCallback        HTTPCallback
	ProgressReportFrequency int

	// The state to resume from as dumped by the PanicErrorHandler.
	// If this is null, a new Ghostferry run will be started. Otherwise, the
	// reconciliation process will start and Ghostferry will resume after that.
	StateToResumeFrom *SerializableState

	// If set (and no serialized state was provided), read the resume state from
	// the target system in this database.
	//
	// NOTE: To avoid collision between different instances of ghostferry, the
	// system will use the MyServerId to create unique table names within this
	// database
	ResumeStateFromDB string

	// Enforce writing binlog writer position updates into the "resume-state
	// DB" on every write to the DB (using a transaction). In most cases, this
	// is not required, as double-applying data updates is safe.
	// This is only applicable if ResumeStateFromDB is set.
	//
	// By default we write state only on shutdown, meaning that we may re-apply
	// large batches of updates multiple times if we crash before serializing.
	ForceResumeStateUpdatesToDB bool

	// The verifier to use during the run. Valid choices are:
	// ChecksumTable
	// Iterative
	// NoVerification
	//
	// If it is left blank, the Verifier member variable on the Ferry will be
	// used. If that member variable is nil, no verification will be done.
	VerifierType string

	// Only useful if VerifierType == Iterative.
	// This specifies the configurations to the IterativeVerifier.
	//
	// This option is in the process of being deprecated.
	IterativeVerifierConfig IterativeVerifierConfig

	// Only useful if VerifierType == Inline.
	// This specifies the configurations to the InlineVerifierConfig.
	InlineVerifierConfig InlineVerifierConfig

	// For old versions mysql<5.6.2, MariaDB<10.1.6 which has no related var
	// Make sure you have binlog_row_image=FULL when turning on this
	SkipBinlogRowImageCheck bool

	// This config is necessary for inline verification for a special case of
	// Ghostferry:
	//
	// - If you are copying a table where the data is already partially on the
	//   target through some other means.
	//   - Specifically, the VerifierPaginationKey of this row on both the source and the target are
	//     the same. Thus, INSERT IGNORE will skip copying this row, leaving the
	//     data on the target unchanged.
	//   - If the data on the target is already identical to the source, then
	//     verification will pass and all is well.
	// - However, if this data is compressed with a non-determinstic algorithm
	//   such as snappy, the compressed blob may not be equal even when the
	//   uncompressed data is equal.
	// - This column signals to the InlineVerifier that it needs to decompress
	//   the data to compare identity.
	//
	// Note: a similar option exists in IterativeVerifier. However, the
	// IterativeVerifier is being deprecated and this will be the correct place
	// to specify it if you don't need the IterativeVerifier.
	CompressedColumnsForVerification ColumnCompressionConfig

	// This config is also for inline verification for the same special case of
	// Ghostferry as documented with the CompressedColumnsForVerification option:
	//
	// - If you're copying a table where the data is partially already on the
	//   the target through some other means.
	// - A difference in a particular column could be acceptable.
	//   - An example would be a table with a data field and a created_at field.
	//     Maybe the created_at field is not important for data integrity as long
	//     as the data field is correct.
	// - Putting the column in this config will cause the InlineVerifier to skip
	//   this column for verification.
	IgnoredColumnsForVerification ColumnIgnoreConfig

	// Ghostferry requires a single numeric column to paginate over tables. Inferring that column is done in the following exact order:
	// 1. Find the table in the FullCopyTables list and perform non-paginated copies (only reasonable for small tables).
	// 2. Use the PerTable pagination column, if configured for a table. Fail if we cannot find this column in the table.
	// 3. Use the table's primary key column as the pagination column. Fail if the primary key is not numeric or is a composite key without a FallbackColumn specified.
	// 4. Use the FallbackColumn pagination column, if configured. Fail if we cannot find this column in the table.
	CascadingPaginationColumnConfig *CascadingPaginationColumnConfig
}

func (*Config) ValidateConfig

func (c *Config) ValidateConfig() error

type ControlServer

type ControlServer struct {
	F        *Ferry
	Verifier Verifier
	Addr     string
	Basedir  string
	// contains filtered or unexported fields
}

func (*ControlServer) HandleCutover

func (this *ControlServer) HandleCutover(w http.ResponseWriter, r *http.Request)

func (*ControlServer) HandleIndex

func (this *ControlServer) HandleIndex(w http.ResponseWriter, r *http.Request)

func (*ControlServer) HandlePause

func (this *ControlServer) HandlePause(w http.ResponseWriter, r *http.Request)

func (*ControlServer) HandleStatusHealthCheck

func (this *ControlServer) HandleStatusHealthCheck(w http.ResponseWriter, r *http.Request)

func (*ControlServer) HandleStop

func (this *ControlServer) HandleStop(w http.ResponseWriter, r *http.Request)

func (*ControlServer) HandleUnpause

func (this *ControlServer) HandleUnpause(w http.ResponseWriter, r *http.Request)

func (*ControlServer) HandleVerify

func (this *ControlServer) HandleVerify(w http.ResponseWriter, r *http.Request)

func (*ControlServer) Initialize

func (this *ControlServer) Initialize() (err error)

func (*ControlServer) Run

func (this *ControlServer) Run(wg *sync.WaitGroup)

func (*ControlServer) ServeHTTP

func (this *ControlServer) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*ControlServer) Shutdown

func (this *ControlServer) Shutdown() error

type CopyFilter

type CopyFilter interface {
	// BuildSelect is used to set up the query used for batch data copying,
	// allowing for restricting copying to a subset of data. Returning an error
	// here will cause the query to be retried, until the retry limit is
	// reached, at which point the ferry will be aborted. BuildSelect is passed
	// the columns to be selected, table being copied, the last primary key value
	// from the previous batch, the batch size, and whether we iterate in
	// descending order. Call DefaultBuildSelect to generate the default query,
	// which may be used as a starting point.
	BuildSelect([]string, *TableSchema, *PaginationKeyData, uint64, bool) (sq.SelectBuilder, error)

	// ApplicableEvent is used to filter events for rows that have been
	// filtered in ConstrainSelect. ApplicableEvent should return true if the
	// event is for a row that would be selected by ConstrainSelect, and false
	// otherwise.
	// Returning an error here will cause the ferry to be aborted.
	ApplicableDMLEvent(DMLEvent) (bool, error)
}

CopyFilter provides an interface for restricting the copying to a subset of data. This typically involves adding a WHERE condition in the ConstrainSelect function, and returning false for unwanted rows in ApplicableDMLEvent.

type CountMetric

type CountMetric struct {
	MetricBase
	Value int64
}

type CursorConfig

type CursorConfig struct {
	DB        *sql.DB
	Throttler Throttler

	ColumnsToSelect []string
	BuildSelect     func([]string, *TableSchema, *PaginationKeyData, uint64, bool) (squirrel.SelectBuilder, error)
	BatchSize       uint64
	ReadRetries     int

	IterateInDescendingOrder bool
}

func (*CursorConfig) NewFullTableCursor

func (c *CursorConfig) NewFullTableCursor(table *TableSchema, lockOnDB bool, tableLock *sync.RWMutex) *FullTableCursor

returns a new PaginatedCursor with an embedded copy of itself

func (*CursorConfig) NewPaginatedCursor

func (c *CursorConfig) NewPaginatedCursor(table *TableSchema, startPaginationKey, maxPaginationKey *PaginationKeyData) *PaginatedCursor

returns a new PaginatedCursor with an embedded copy of itself

func (*CursorConfig) NewPaginatedCursorWithoutRowLock

func (c *CursorConfig) NewPaginatedCursorWithoutRowLock(table *TableSchema, startPaginationKey, maxPaginationKey *PaginationKeyData, tableLock *sync.RWMutex) *PaginatedCursor

returns a new PaginatedCursor with an embedded copy of itself

type DDLEvent

type DDLEvent interface {
	DXLEvent
	SqlCommand() string
}

func NewBinlogDDLEvent

func NewBinlogDDLEvent(command string, affectedTable *QualifiedTableName, pos BinlogPosition, time time.Time) (DDLEvent, error)

type DMLEvent

type DMLEvent interface {
	DXLEvent
	TableSchema() *TableSchema
	OldValues() RowData
	NewValues() RowData
	VerifierPaginationKey() (uint64, error)
}

func NewBinlogDMLEvents

func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos BinlogPosition, time time.Time) ([]DMLEvent, error)

func NewBinlogDeleteEvents

func NewBinlogDeleteEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos BinlogPosition, time time.Time) ([]DMLEvent, error)

func NewBinlogInsertEvents

func NewBinlogInsertEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos BinlogPosition, time time.Time) ([]DMLEvent, error)

func NewBinlogUpdateEvents

func NewBinlogUpdateEvents(table *TableSchema, rowsEvent *replication.RowsEvent, pos BinlogPosition, time time.Time) ([]DMLEvent, error)

type DMLEventBase

type DMLEventBase struct {
	*DXLEventBase
	// contains filtered or unexported fields
}

func (*DMLEventBase) Database

func (e *DMLEventBase) Database() string

func (*DMLEventBase) Table

func (e *DMLEventBase) Table() string

func (*DMLEventBase) TableSchema

func (e *DMLEventBase) TableSchema() *TableSchema

type DXLEvent

type DXLEvent interface {
	Database() string
	Table() string
	AsSQLString(string, string) (string, error)
	BinlogPosition() BinlogPosition
	EventTime() time.Time
	IsAutoTransaction() bool
}

a DXLEvent is the base for DDL or DML

type DXLEventBase

type DXLEventBase struct {
	// contains filtered or unexported fields
}

The base of DMLEvent/DDLEvent to provide the necessary methods.

func (*DXLEventBase) BinlogPosition

func (e *DXLEventBase) BinlogPosition() BinlogPosition

func (*DXLEventBase) EventTime

func (e *DXLEventBase) EventTime() time.Time

func (*DXLEventBase) IsAutoTransaction

func (e *DXLEventBase) IsAutoTransaction() bool

type DXLEventCallback

type DXLEventCallback interface {
	Notify() error
}

type DXLEventWrapper

type DXLEventWrapper struct {
	DXLEvent
	*ReplicationEvent
	PostApplyCallback DXLEventCallback
}

type DataIterator

type DataIterator struct {
	DB                *sql.DB
	Concurrency       int
	SelectFingerprint bool

	ErrorHandler ErrorHandler
	CursorConfig *CursorConfig
	StateTracker *StateTracker
	// contains filtered or unexported fields
}

func NewDataIterator

func NewDataIterator(f *Ferry) *DataIterator

func (*DataIterator) AddBatchListener

func (d *DataIterator) AddBatchListener(listener func(RowBatch) error)

func (*DataIterator) AddDoneListener

func (d *DataIterator) AddDoneListener(listener func() error)

func (*DataIterator) Run

func (d *DataIterator) Run(tables []*TableSchema)

type DataRowBatch

type DataRowBatch struct {
	// contains filtered or unexported fields
}

func NewDataRowBatch

func NewDataRowBatch(table *TableSchema, values []RowData) *DataRowBatch

func (*DataRowBatch) AsSQLQuery

func (e *DataRowBatch) AsSQLQuery(schemaName, tableName string) (string, []interface{}, error)

func (*DataRowBatch) Fingerprints

func (e *DataRowBatch) Fingerprints() map[uint64][]byte

func (*DataRowBatch) IsTableComplete

func (e *DataRowBatch) IsTableComplete() bool

func (*DataRowBatch) Size

func (e *DataRowBatch) Size() int

func (*DataRowBatch) TableSchema

func (e *DataRowBatch) TableSchema() *TableSchema

func (*DataRowBatch) Values

func (e *DataRowBatch) Values() []RowData

func (*DataRowBatch) VerifierPaginationKey

func (e *DataRowBatch) VerifierPaginationKey(rowIndex int) (paginationValue uint64, err error)

type DatabaseConfig

type DatabaseConfig struct {
	Host      string
	Port      uint16
	User      string
	Pass      string
	Collation string
	Params    map[string]string
	// SQL query comments to differentiate Ghostferry's binlog events
	// Optional: defaults to empty string (no comments)
	Marginalia string

	TLS *TLSConfig
}

func (*DatabaseConfig) MySQLConfig

func (c *DatabaseConfig) MySQLConfig() (*mysql.Config, error)

func (*DatabaseConfig) SqlDB

func (c *DatabaseConfig) SqlDB(logger *logrus.Entry) (*sql.DB, error)

func (*DatabaseConfig) Validate

func (c *DatabaseConfig) Validate() error

type ErrorHandler

type ErrorHandler interface {
	// Usually called from Fatal. When called from Fatal, if this method returns
	// true, Fatal should panic, otherwise it should not.
	ReportError(from string, err error)
	Fatal(from string, err error)
}

type Ferry

type Ferry struct {
	*Config

	SourceDB *sql.DB
	TargetDB *sql.DB

	BinlogStreamer *BinlogStreamer
	BinlogWriter   *BinlogWriter

	DataIterator *DataIterator
	BatchWriter  *BatchWriter

	StateTracker                       *StateTracker
	ErrorHandler                       ErrorHandler
	MigrationThrottler                 Throttler
	ReplicationThrottler               Throttler
	WaitUntilReplicaIsCaughtUpToMaster *WaitUntilReplicaIsCaughtUpToMaster

	// This can be specified by the caller. If specified, do not specify
	// VerifierType in Config (or as an empty string) or an error will be
	// returned in Initialize.
	//
	// If VerifierType is specified and this is nil on Ferry initialization, a
	// Verifier will be created by Initialize. If an IterativeVerifier is to be
	// created, IterativeVerifierConfig will be used to create the verifier.
	Verifier Verifier

	Tables TableSchemaCache

	StartTime    time.Time
	DoneTime     time.Time
	OverallState string
	// contains filtered or unexported fields
}

func (*Ferry) FlushBinlogAndStopStreaming

func (f *Ferry) FlushBinlogAndStopStreaming()

After you stop writing to the source and made sure that all inflight transactions to the source are completed, call this method to ensure that the binlog streaming has caught up and stop the binlog streaming.

This method will actually not shutdown the BinlogStreamer immediately. You will know that the BinlogStreamer finished when .Run() returns.

func (*Ferry) Initialize

func (f *Ferry) Initialize() (err error)

Initialize all the components of Ghostferry and connect to the Database

func (*Ferry) NewBatchWriter

func (f *Ferry) NewBatchWriter() *BatchWriter

func (*Ferry) NewBatchWriterWithoutStateTracker

func (f *Ferry) NewBatchWriterWithoutStateTracker() *BatchWriter

func (*Ferry) NewBinlogStreamer

func (f *Ferry) NewBinlogStreamer() *BinlogStreamer

func (*Ferry) NewBinlogWriter

func (f *Ferry) NewBinlogWriter() *BinlogWriter

func (*Ferry) NewBinlogWriterWithoutStateTracker

func (f *Ferry) NewBinlogWriterWithoutStateTracker() *BinlogWriter

func (*Ferry) NewChecksumTableVerifier

func (f *Ferry) NewChecksumTableVerifier() *ChecksumTableVerifier

func (*Ferry) NewDataIterator

func (f *Ferry) NewDataIterator() *DataIterator

func (*Ferry) NewDataIteratorWithoutStateTracker

func (f *Ferry) NewDataIteratorWithoutStateTracker() *DataIterator

func (*Ferry) NewInlineVerifier

func (f *Ferry) NewInlineVerifier() *InlineVerifier

func (*Ferry) NewInlineVerifierWithoutStateTracker

func (f *Ferry) NewInlineVerifierWithoutStateTracker() *InlineVerifier

func (*Ferry) NewIterativeVerifier

func (f *Ferry) NewIterativeVerifier() (*IterativeVerifier, error)

func (*Ferry) Progress

func (f *Ferry) Progress() *Progress

func (*Ferry) ReportProgress

func (f *Ferry) ReportProgress()

func (*Ferry) Run

func (f *Ferry) Run()

Spawns the background tasks that actually perform the run. Wait for the background tasks to finish.

func (*Ferry) RunStandaloneDataCopy

func (f *Ferry) RunStandaloneDataCopy(tables []*TableSchema) error

func (*Ferry) SerializeStateToJSON

func (f *Ferry) SerializeStateToJSON() (string, error)

func (*Ferry) Start

func (f *Ferry) Start() error

Attach event listeners for Ghostferry components and connect the binlog streamer to the source shard

Note: Binlog streaming doesn't start until Run. Here we simply connect to MySQL.

func (*Ferry) WaitUntilBinlogStreamerCatchesUp

func (f *Ferry) WaitUntilBinlogStreamerCatchesUp()

func (*Ferry) WaitUntilRowCopyIsComplete

func (f *Ferry) WaitUntilRowCopyIsComplete()

Call this method and perform the cutover after this method returns.

type FinalizeTableCopyBatch

type FinalizeTableCopyBatch struct {
	// contains filtered or unexported fields
}

func NewFinalizeTableCopyBatch

func NewFinalizeTableCopyBatch(table *TableSchema) *FinalizeTableCopyBatch

func (*FinalizeTableCopyBatch) AsSQLQuery

func (e *FinalizeTableCopyBatch) AsSQLQuery(schemaName, tableName string) (string, []interface{}, error)

func (*FinalizeTableCopyBatch) IsTableComplete

func (e *FinalizeTableCopyBatch) IsTableComplete() bool

func (*FinalizeTableCopyBatch) Size

func (e *FinalizeTableCopyBatch) Size() int

func (*FinalizeTableCopyBatch) TableSchema

func (e *FinalizeTableCopyBatch) TableSchema() *TableSchema

type FullTableCursor

type FullTableCursor struct {
	DB          *sql.DB
	Table       *TableSchema
	BatchSize   uint64
	ReadRetries int
	// contains filtered or unexported fields
}

func (*FullTableCursor) Each

func (c *FullTableCursor) Each(f func(RowBatch) error) error

func (*FullTableCursor) Fetch

func (c *FullTableCursor) Fetch(db SqlPreparer, rowOffset int) (batch InsertRowBatch, err error)

type GaugeMetric

type GaugeMetric struct {
	MetricBase
	Value float64
}

type HTTPCallback

type HTTPCallback struct {
	URI     string
	Payload string
}

func (HTTPCallback) Post

func (h HTTPCallback) Post(client *http.Client) error

type IncompleteVerificationError

type IncompleteVerificationError struct{}

func (IncompleteVerificationError) Error

type InlineVerifier

type InlineVerifier struct {
	SourceDB                   *sql.DB
	TargetDB                   *sql.DB
	DatabaseRewrites           map[string]string
	TableRewrites              map[string]string
	CopyFilter                 CopyFilter
	TableSchemaCache           TableSchemaCache
	BatchSize                  int
	VerifyBinlogEventsInterval time.Duration
	MaxExpectedDowntime        time.Duration

	StateTracker *StateTracker
	ErrorHandler ErrorHandler
	// contains filtered or unexported fields
}

func (*InlineVerifier) CheckFingerprintInline

func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, targetTable string, sourceBatch InsertRowBatch) ([]uint64, error)

func (*InlineVerifier) PeriodicallyVerifyBinlogEvents

func (v *InlineVerifier) PeriodicallyVerifyBinlogEvents(ctx context.Context)

func (*InlineVerifier) Result

func (*InlineVerifier) StartInBackground

func (v *InlineVerifier) StartInBackground() error

func (*InlineVerifier) VerifyBeforeCutover

func (v *InlineVerifier) VerifyBeforeCutover() error

func (*InlineVerifier) VerifyDuringCutover

func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error)

func (*InlineVerifier) Wait

func (v *InlineVerifier) Wait()

type InlineVerifierConfig

type InlineVerifierConfig struct {
	// The maximum expected downtime during cutover, in the format of
	// time.ParseDuration. If nothing is specified, the InlineVerifier will not
	// try to estimate the downtime and will always allow cutover.
	MaxExpectedDowntime string

	// The interval at which the periodic binlog reverification occurs, in the
	// format of time.ParseDuration. Default: 1s.
	VerifyBinlogEventsInterval string
	// contains filtered or unexported fields
}

func (*InlineVerifierConfig) Validate

func (c *InlineVerifierConfig) Validate() error

type InsertRowBatch

type InsertRowBatch interface {
	RowBatch
	Values() []RowData
	VerifierPaginationKey(int) (uint64, error)
	Fingerprints() map[uint64][]byte
}

type IterativeVerifier

type IterativeVerifier struct {
	CompressionVerifier *CompressionVerifier
	CursorConfig        *CursorConfig
	BinlogStreamer      *BinlogStreamer
	TableSchemaCache    TableSchemaCache
	SourceDB            *sql.DB
	TargetDB            *sql.DB

	Tables              []*TableSchema
	IgnoredTables       []string
	IgnoredColumns      map[string]map[string]struct{}
	DatabaseRewrites    map[string]string
	TableRewrites       map[string]string
	CopyFilter          CopyFilter
	Concurrency         int
	MaxExpectedDowntime time.Duration
	// contains filtered or unexported fields
}

func (*IterativeVerifier) GetHashes

func (v *IterativeVerifier) GetHashes(db *sql.DB, schema, table, paginationKeyColumn string, columns []schema.TableColumn, paginationKeys []uint64) (map[uint64][]byte, error)

func (*IterativeVerifier) Initialize

func (v *IterativeVerifier) Initialize() error

func (*IterativeVerifier) Result

func (*IterativeVerifier) SanityCheckParameters

func (v *IterativeVerifier) SanityCheckParameters() error

func (*IterativeVerifier) StartInBackground

func (v *IterativeVerifier) StartInBackground() error

func (*IterativeVerifier) VerifyBeforeCutover

func (v *IterativeVerifier) VerifyBeforeCutover() error

func (*IterativeVerifier) VerifyDuringCutover

func (v *IterativeVerifier) VerifyDuringCutover() (VerificationResult, error)

func (*IterativeVerifier) VerifyOnce

func (v *IterativeVerifier) VerifyOnce() (VerificationResult, error)

func (*IterativeVerifier) Wait

func (v *IterativeVerifier) Wait()

type IterativeVerifierConfig

type IterativeVerifierConfig struct {
	// List of tables that should be ignored by the IterativeVerifier.
	IgnoredTables []string

	// List of columns that should be ignored by the IterativeVerifier.
	// This is in the format of table_name -> [list of column names]
	IgnoredColumns map[string][]string

	// The number of concurrent verifiers. Note that a single table can only be
	// assigned to one goroutine and currently multiple goroutines per table
	// is not supported.
	Concurrency int

	// The maximum expected downtime during cutover, in the format of
	// time.ParseDuration.
	MaxExpectedDowntime string

	// Map of the table and column identifying the compression type
	// (if any) of the column. This is used during verification to ensure
	// the data was successfully copied as some compression algorithms can
	// output different compressed data with the same input data.
	//
	// The data structure is a map of table names to a map of column names
	// to the compression algorithm.
	// ex: {books: {contents: snappy}}
	//
	// Currently supported compression algorithms are:
	//	1. Snappy (https://google.github.io/snappy/) as "SNAPPY"
	//
	// Optional: defaults to empty map/no compression
	//
	// Note that the IterativeVerifier is in the process of being deprecated.
	// If this is specified, ColumnCompressionConfig should also be filled out in
	// the main Config.
	TableColumnCompression TableColumnCompressionConfig
}

func (*IterativeVerifierConfig) Validate

func (c *IterativeVerifierConfig) Validate() error

type LagThrottler

type LagThrottler struct {
	ThrottlerBase
	PauserThrottler

	DB *sql.DB
	// contains filtered or unexported fields
}

func NewLagThrottler

func NewLagThrottler(config *LagThrottlerConfig) (*LagThrottler, error)

func (*LagThrottler) Run

func (t *LagThrottler) Run(ctx context.Context) error

func (*LagThrottler) Throttled

func (t *LagThrottler) Throttled() bool

type LagThrottlerConfig

type LagThrottlerConfig struct {
	Connection     *DatabaseConfig
	MaxLag         int
	Query          string
	UpdateInterval string
}

type MetricBase

type MetricBase struct {
	Key        string
	Tags       []MetricTag
	SampleRate float64
}

type MetricTag

type MetricTag struct {
	Name  string
	Value string
}

type Metrics

type Metrics struct {
	Prefix      string
	DefaultTags []MetricTag
	Sink        chan interface{}
	// contains filtered or unexported fields
}

func SetGlobalMetrics

func SetGlobalMetrics(prefix string, sink chan interface{}) *Metrics

func (*Metrics) AddConsumer

func (m *Metrics) AddConsumer()

func (*Metrics) Count

func (m *Metrics) Count(key string, value int64, tags []MetricTag, sampleRate float64)

func (*Metrics) DoneConsumer

func (m *Metrics) DoneConsumer()

func (*Metrics) Gauge

func (m *Metrics) Gauge(key string, value float64, tags []MetricTag, sampleRate float64)

func (*Metrics) Measure

func (m *Metrics) Measure(key string, tags []MetricTag, sampleRate float64, f func())

func (*Metrics) StopAndFlush

func (m *Metrics) StopAndFlush()

func (*Metrics) Timer

func (m *Metrics) Timer(key string, duration time.Duration, tags []MetricTag, sampleRate float64)

type PaginatedCursor

type PaginatedCursor struct {
	CursorConfig

	Table            *TableSchema
	MaxPaginationKey *PaginationKeyData
	RowLock          bool
	// contains filtered or unexported fields
}

func (*PaginatedCursor) Each

func (c *PaginatedCursor) Each(f func(RowBatch) error) error

func (*PaginatedCursor) Fetch

func (c *PaginatedCursor) Fetch(db SqlPreparer) (batch InsertRowBatch, paginationKeyData *PaginationKeyData, err error)

type PaginationKey

type PaginationKey struct {
	// The sorted list of columns
	Columns []*schema.TableColumn

	// The sorted indices of the columns as they appear in the table
	ColumnIndices []int

	// Optional index of the column that we consider most indicative of
	// progress. Such an entry may not exist and can be set to -1 to explicitly
	// indicating that it does not exist.
	// Used only for estimating the position of a particular value within the
	// range of the pagination
	MostSignificantColumnIndex int
}

func (PaginationKey) IsLinearUnsignedKey

func (k PaginationKey) IsLinearUnsignedKey() bool

func (PaginationKey) String

func (k PaginationKey) String() string

type PaginationKeyData

type PaginationKeyData struct {
	// The Values is the subset of column values of a full row that makes up
	// the pagination key. The list is stored in the same way as the pagination
	// key column itself
	Values RowData
	// contains filtered or unexported fields
}

func NewPaginationKeyDataFromRow

func NewPaginationKeyDataFromRow(row RowData, paginationKey *PaginationKey) (paginationKeyData *PaginationKeyData, err error)

func UnmarshalPaginationKeyData

func UnmarshalPaginationKeyData(keyData *PaginationKeyData, table *TableSchema) (paginationKeyData *PaginationKeyData, err error)

func (*PaginationKeyData) Compare

func (d *PaginationKeyData) Compare(other *PaginationKeyData) int

func (PaginationKeyData) ProgressData

func (d PaginationKeyData) ProgressData() (progress uint64, exists bool)

for some types of keys, we can estimate the progress of pagination by comparing the most significant part of the key to the target pagination value. It's only a rough estimate, as it assume a linear distribution of values between 0 and the target (no negative values, no holes/jumps, etc) and cannot work for non-integer keys, but it's nevertheless useful

func (PaginationKeyData) String

func (d PaginationKeyData) String() string

type PaginationKeyPositionLog

type PaginationKeyPositionLog struct {
	Position uint64
	At       time.Time
}

For tracking the speed of the copy

type PanicErrorHandler

type PanicErrorHandler struct {
	Ferry             *Ferry
	ErrorCallback     HTTPCallback
	DumpState         bool
	DumpStateFilename string
	// contains filtered or unexported fields
}

func (*PanicErrorHandler) Fatal

func (this *PanicErrorHandler) Fatal(from string, err error)

func (*PanicErrorHandler) ReportError

func (this *PanicErrorHandler) ReportError(from string, err error)

type PauserThrottler

type PauserThrottler struct {
	ThrottlerBase
	// contains filtered or unexported fields
}

func (*PauserThrottler) Run

func (t *PauserThrottler) Run(ctx context.Context) error

func (*PauserThrottler) SetPaused

func (t *PauserThrottler) SetPaused(paused bool)

func (*PauserThrottler) Throttled

func (t *PauserThrottler) Throttled() bool

type Progress

type Progress struct {
	// Possible values are defined in ferry.go
	// Shows what the ferry is currently doing in one word.
	CurrentState string

	// The Payload field of the ProgressCallback config will be copied to here
	// verbatim.
	// Example usecase: you can be sending all the status to some aggregation
	// server and you want some sort of custom identification with this field.
	CustomPayload string

	Tables                  map[string]TableProgress
	LastSuccessfulBinlogPos mysql.Position
	BinlogStreamerLag       float64 // seconds
	Throttled               bool

	// The behaviour of Ghostferry varies with respect to the VerifierType.
	// For example: a long cutover is OK if
	VerifierType string

	// These are some variables that are only filled when CurrentState == done.
	FinalBinlogPos mysql.Position

	// A best estimate on the speed at which the copying is taking place. If
	// there are large gaps in the VerifierPaginationKey space, this probably will be inaccurate.
	PaginationKeysPerSecond uint64
	ETA                     float64 // seconds
	TimeTaken               float64 // seconds
}

type QualifiedTableName

type QualifiedTableName struct {
	SchemaName string
	TableName  string
}

func NewQualifiedTableName

func NewQualifiedTableName(schemaName, tableName string) QualifiedTableName

func (QualifiedTableName) String

func (n QualifiedTableName) String() string

type QueryAnalyzer

type QueryAnalyzer struct {
	// contains filtered or unexported fields
}

func NewQueryAnalyzer

func NewQueryAnalyzer() *QueryAnalyzer

func (*QueryAnalyzer) ParseSchemaChanges

func (q *QueryAnalyzer) ParseSchemaChanges(sqlStatement string, schemaOfStatement string) ([]*SchemaEvent, error)

type ReloadTableSchemasCallback

type ReloadTableSchemasCallback struct {
	*BinlogWriter
	TableStructuresToReload []*QualifiedTableName
}

func (*ReloadTableSchemasCallback) Notify

func (c *ReloadTableSchemasCallback) Notify() error

type ReplicatedMasterPositionFetcher

type ReplicatedMasterPositionFetcher interface {
	Current(*sql.DB) (mysql.Position, error)
}

type ReplicatedMasterPositionViaCustomQuery

type ReplicatedMasterPositionViaCustomQuery struct {
	// The custom query executing should return a single row with two values:
	// the string file and the integer position. For pt-heartbeat, this query
	// would be:
	//
	// "SELECT file, position FROM meta.ptheartbeat WHERE server_id = %d" % serverId
	//
	// where serverId is the master server id, and meta.ptheartbeat is the table
	// where pt-heartbeat writes to.
	//
	// For pt-heartbeat in particular, you should not use the
	// relay_master_log_file and exec_master_log_pos of the DB being replicated
	// as these values are not the master binlog positions.
	Query string
}

Selects the master position that we have replicated until from a heartbeat table of sort.

func (ReplicatedMasterPositionViaCustomQuery) Current

type ReplicationEvent

type ReplicationEvent struct {
	BinlogPosition BinlogPosition
	BinlogEvent    *replication.BinlogEvent
	EventTime      time.Time
}

type ReverifyBatch

type ReverifyBatch struct {
	PaginationKeys []uint64
	Table          TableIdentifier
}

type ReverifyEntry

type ReverifyEntry struct {
	PaginationKey uint64
	Table         *TableSchema
}

type ReverifyStore

type ReverifyStore struct {
	MapStore map[TableIdentifier]map[uint64]struct{}

	BatchStore         []ReverifyBatch
	RowCount           uint64
	EmitLogPerRowCount uint64
	// contains filtered or unexported fields
}

func NewReverifyStore

func NewReverifyStore() *ReverifyStore

func (*ReverifyStore) Add

func (r *ReverifyStore) Add(entry ReverifyEntry)

func (*ReverifyStore) FlushAndBatchByTable

func (r *ReverifyStore) FlushAndBatchByTable(batchsize int) []ReverifyBatch

type RowBatch

type RowBatch interface {
	TableSchema() *TableSchema
	AsSQLQuery(schemaName, tableName string) (string, []interface{}, error)
	Size() int
	IsTableComplete() bool
}

type RowData

type RowData []interface{}

func ScanGenericRow

func ScanGenericRow(rows *sqlorig.Rows, columnCount int) (RowData, error)

func (RowData) GetInt64

func (r RowData) GetInt64(colIdx int) (res int64, err error)

The mysql driver never actually gives you a uint64 from Scan, instead you get an int64 for values that fit in int64 or a byte slice decimal string with the uint64 value in it. But we have other code in ghostferry that generates DMLEvents that may contain unsigned integer types - so we unify the reading of such columns in this helper method

func (RowData) GetString

func (r RowData) GetString(colIdx int) string

type SchemaEvent

type SchemaEvent struct {
	// The SQL statement of the event as returned by the SQL statement parser
	SchemaStatement string
	// Does this event modify the structure/schema of the DB? ALTER or CREATE table statements alter the schema,
	// but TRUNCATE does not. RENAME statements also alter the structure of which tables are available, and are
	// thus also considered schema changes
	IsSchemaChange bool
	// "affected" is always the table on which an event operates on (created, deleted, truncated, altered, etc.)
	// For the corner-case of a renamed table, it refers to the original table.
	// The idea is to have one field that exposes what tables existed previously and that are affected by an
	// it is *NOT* the deleted table
	AffectedTable *QualifiedTableName
	// the created table as part of a create/rename operation. This can overlap with "affected" tables
	CreatedTable *QualifiedTableName
	// the deleted table as part of a delete/rename operation. This can overlap with "affected" tables
	DeletedTable *QualifiedTableName
}

type SerializableState

type SerializableState struct {
	GhostferryVersion         string
	LastKnownTableSchemaCache TableSchemaCache

	LastSuccessfulPaginationKeys              map[string]*PaginationKeyData
	CompletedTables                           map[string]bool
	LastWrittenBinlogPosition                 BinlogPosition
	LastStoredBinlogPositionForInlineVerifier BinlogPosition
	BinlogVerifyStore                         BinlogVerifySerializedStore
}

func (*SerializableState) MinBinlogPosition

func (s *SerializableState) MinBinlogPosition() BinlogPosition

type SqlDBWithFakeRollback

type SqlDBWithFakeRollback struct {
	*sql.DB
	// contains filtered or unexported fields
}

func NewSqlDBWithFakeRollback

func NewSqlDBWithFakeRollback(db *sql.DB, lock *sync.RWMutex) *SqlDBWithFakeRollback

func (*SqlDBWithFakeRollback) Rollback

func (d *SqlDBWithFakeRollback) Rollback() error

type SqlPreparer

type SqlPreparer interface {
	Prepare(string) (*sqlorig.Stmt, error)
}

both `sql.Tx` and `sql.DB` allow a SQL query to be `Prepare`d

type SqlPreparerAndRollbacker

type SqlPreparerAndRollbacker interface {
	SqlPreparer
	Query(query string, args ...interface{}) (*sqlorig.Rows, error)
	Rollback() error
}

sql.DB does not implement Rollback, but can use SqlDBWithFakeRollback to perform a noop.

type StateTracker

type StateTracker struct {
	BinlogRWMutex *sync.RWMutex
	CopyRWMutex   *sync.RWMutex
	// contains filtered or unexported fields
}

func NewStateTracker

func NewStateTracker(speedLogCount int) *StateTracker

func NewStateTrackerFromSerializedState

func NewStateTrackerFromSerializedState(speedLogCount int, serializedState *SerializableState, tables TableSchemaCache) (*StateTracker, error)

serializedState is a state the tracker should start from, as opposed to starting from the beginning.

func (*StateTracker) EstimatedPaginationKeysPerSecond

func (s *StateTracker) EstimatedPaginationKeysPerSecond() float64

This is reasonably accurate if the rows copied are distributed uniformly between paginationKey = 0 -> max(paginationKey). It would not be accurate if the distribution is concentrated in a particular region.

func (*StateTracker) GetStoreBinlogWriterPositionSql

func (s *StateTracker) GetStoreBinlogWriterPositionSql(pos BinlogPosition, lastEventTs time.Time) (sqlStr string, args []interface{}, err error)

func (*StateTracker) GetStoreInlineVerifierPositionSql

func (s *StateTracker) GetStoreInlineVerifierPositionSql(pos BinlogPosition) (sqlStr string, args []interface{}, err error)

func (*StateTracker) GetStoreRowCopyDoneSql

func (s *StateTracker) GetStoreRowCopyDoneSql(tableName string) (sqlStr string, args []interface{}, err error)

func (*StateTracker) GetStoreRowCopyPositionSql

func (s *StateTracker) GetStoreRowCopyPositionSql(tableName string, endPaginationKey *PaginationKeyData) (sqlStr string, args []interface{}, err error)

func (*StateTracker) GetTableLock

func (s *StateTracker) GetTableLock(table string) *sync.RWMutex

func (*StateTracker) IsTableComplete

func (s *StateTracker) IsTableComplete(table string) bool

func (*StateTracker) LastSuccessfulPaginationKey

func (s *StateTracker) LastSuccessfulPaginationKey(table string) (paginationKeyData *PaginationKeyData, completed bool)

func (*StateTracker) MarkTableAsCompleted

func (s *StateTracker) MarkTableAsCompleted(table string)

func (*StateTracker) Serialize

func (s *StateTracker) Serialize(lastKnownTableSchemaCache TableSchemaCache, binlogVerifyStore *BinlogVerifyStore) *SerializableState

func (*StateTracker) SerializeToDB

func (s *StateTracker) SerializeToDB(db *sql.DB) error

func (*StateTracker) UpdateLastStoredBinlogPositionForInlineVerifier

func (s *StateTracker) UpdateLastStoredBinlogPositionForInlineVerifier(pos BinlogPosition)

func (*StateTracker) UpdateLastSuccessfulPaginationKey

func (s *StateTracker) UpdateLastSuccessfulPaginationKey(table string, paginationKey *PaginationKeyData)

func (*StateTracker) UpdateLastWrittenBinlogPosition

func (s *StateTracker) UpdateLastWrittenBinlogPosition(pos BinlogPosition)

type StatusDeprecated

type StatusDeprecated struct {
	GhostferryVersion string

	SourceHostPort string
	TargetHostPort string

	OverallState            string
	StartTime               time.Time
	CurrentTime             time.Time
	TimeTaken               time.Duration
	ETA                     time.Duration
	BinlogStreamerLag       time.Duration
	PaginationKeysPerSecond uint64

	BinlogWriterState      BinlogWriterState
	BinlogWriterStateTs    time.Time
	BinlogWriterStateTsAge time.Duration

	AutomaticCutover            bool
	BinlogStreamerStopRequested bool
	LastSuccessfulBinlogPos     mysql.Position
	TargetBinlogPos             mysql.Position

	// for backwards compatibility, a union of the detailed stats below
	Throttled            bool
	MigrationThrottled   bool
	ReplicationThrottled bool

	CompletedTableCount int
	TotalTableCount     int
	TableStatuses       []*TableStatusDeprecated
	AllTableNames       []string
	AllDatabaseNames    []string

	VerifierSupport     bool
	VerifierAvailable   bool
	VerificationStarted bool
	VerificationDone    bool
	VerificationResult  VerificationResult
	VerificationErr     error
}

func FetchStatusDeprecated

func FetchStatusDeprecated(f *Ferry, v Verifier) *StatusDeprecated

type StmtCache

type StmtCache struct {
	// contains filtered or unexported fields
}

func NewStmtCache

func NewStmtCache() *StmtCache

func (*StmtCache) StmtFor

func (c *StmtCache) StmtFor(p SqlPreparer, query string) (*sqlorig.Stmt, error)

type TLSConfig

type TLSConfig struct {
	CertPath   string
	ServerName string
	// contains filtered or unexported fields
}

func (*TLSConfig) BuildConfig

func (this *TLSConfig) BuildConfig() (*tls.Config, error)

type TableColumnCompressionConfig

type TableColumnCompressionConfig map[string]map[string]string

TableColumnCompressionConfig represents compression configuration for a column in a table as table -> column -> compression-type ex: books -> contents -> snappy

type TableFilter

type TableFilter interface {
	ApplicableTables([]*TableSchema) ([]*TableSchema, error)
	ApplicableDatabases([]string) ([]string, error)
}

type TableForeignKeys

type TableForeignKeys map[QualifiedTableName]bool

define a simple set of table names

func GetForeignKeyTablesOfTable

func GetForeignKeyTablesOfTable(db *sql.DB, table QualifiedTableName, includeSelfReferences bool) (TableForeignKeys, error)

type TableIdentifier

type TableIdentifier struct {
	SchemaName string
	TableName  string
}

A comparable and lightweight type that stores the schema and table name.

func NewTableIdentifierFromSchemaTable

func NewTableIdentifierFromSchemaTable(table *TableSchema) TableIdentifier

type TableProgress

type TableProgress struct {
	LastSuccessfulPaginationKey string
	TargetPaginationKey         string
	CurrentAction               string // Possible values are defined via the constants TableAction*
}

type TableSchema

type TableSchema struct {
	*schema.Table

	CompressedColumnsForVerification map[string]string   // Map of column name => compression type
	IgnoredColumnsForVerification    map[string]struct{} // Set of column name
	PaginationKey                    *PaginationKey
	// contains filtered or unexported fields
}

This is a wrapper on schema.Table with some custom information we need.

func GetTargetPaginationKeys

func GetTargetPaginationKeys(db *sql.DB, tables []*TableSchema, iterateInDescendingOrder bool, logger *logrus.Entry) (paginatedTables map[*TableSchema]*PaginationKeyData, unpaginatedTables []*TableSchema, err error)

func (*TableSchema) FingerprintQuery

func (t *TableSchema) FingerprintQuery(schemaName, tableName string, numRows int) (string, error)

This query returns the MD5 hash for a row on this table. This query is valid for both the source and the target shard.

Any compressed columns specified via CompressedColumnsForVerification are excluded in this checksum and the raw data is returned directly.

Any columns specified in IgnoredColumnsForVerification are excluded from the checksum and the raw data will not be returned.

Note that the MD5 hash should consists of at least 1 column: the paginationKey column. This is to say that there should never be a case where the MD5 hash is derived from an empty string.

func (*TableSchema) RowMd5Query

func (t *TableSchema) RowMd5Query() string

type TableSchemaCache

type TableSchemaCache map[string]*TableSchema

func LoadTables

func LoadTables(db *sql.DB, tableFilter TableFilter, columnCompressionConfig ColumnCompressionConfig, columnIgnoreConfig ColumnIgnoreConfig, cascadingPaginationColumnConfig *CascadingPaginationColumnConfig) (TableSchemaCache, error)

func (TableSchemaCache) AllTableNames

func (c TableSchemaCache) AllTableNames() (tableNames []string)

func (TableSchemaCache) AsSlice

func (c TableSchemaCache) AsSlice() (tables []*TableSchema)

func (TableSchemaCache) Get

func (c TableSchemaCache) Get(database, table string) *TableSchema

func (TableSchemaCache) GetTableCreationOrder

func (c TableSchemaCache) GetTableCreationOrder(db *sql.DB) (prioritzedTableNames []string, err error)

Helper to sort the given map of tables based on the dependencies between tables in terms of foreign key constraints

func (TableSchemaCache) GetTableListWithPriority

func (c TableSchemaCache) GetTableListWithPriority(priorityList []string) (prioritzedTableNames []string)

Helper to sort a given map of tables with a second list giving a priority. If an element is present in the input and the priority lists, the item will appear first (in the order of the priority list), all other items appear in the order given in the input

type TableStatusDeprecated

type TableStatusDeprecated struct {
	TableName                   string
	PaginationKeyName           string
	Status                      string
	LastSuccessfulPaginationKey string
	TargetPaginationKey         string
}

type Throttler

type Throttler interface {
	Throttled() bool
	Disabled() bool
	SetDisabled(bool)
	SetPaused(bool)
	Run(context.Context) error
}

type ThrottlerBase

type ThrottlerBase struct {
	// contains filtered or unexported fields
}

func (*ThrottlerBase) Disabled

func (t *ThrottlerBase) Disabled() bool

func (*ThrottlerBase) SetDisabled

func (t *ThrottlerBase) SetDisabled(disabled bool)

type TimerMetric

type TimerMetric struct {
	MetricBase
	Value time.Duration
}

type TruncateTableBatch

type TruncateTableBatch struct {
	// contains filtered or unexported fields
}

func NewTruncateTableBatch

func NewTruncateTableBatch(table *TableSchema) *TruncateTableBatch

func (*TruncateTableBatch) AsSQLQuery

func (e *TruncateTableBatch) AsSQLQuery(schemaName, tableName string) (string, []interface{}, error)

func (*TruncateTableBatch) IsTableComplete

func (e *TruncateTableBatch) IsTableComplete() bool

func (*TruncateTableBatch) Size

func (e *TruncateTableBatch) Size() int

func (*TruncateTableBatch) TableSchema

func (e *TruncateTableBatch) TableSchema() *TableSchema

type UnsupportedCompressionError

type UnsupportedCompressionError struct {
	// contains filtered or unexported fields
}

UnsupportedCompressionError is used to identify errors resulting from attempting to decompress unsupported algorithms

func (UnsupportedCompressionError) Error

type VerificationResult

type VerificationResult struct {
	DataCorrect     bool
	Message         string
	IncorrectTables []string
}

func NewCorrectVerificationResult

func NewCorrectVerificationResult() VerificationResult

func (VerificationResult) Error

func (e VerificationResult) Error() string

type VerificationResultAndStatus

type VerificationResultAndStatus struct {
	VerificationResult

	StartTime time.Time
	DoneTime  time.Time
}

func (VerificationResultAndStatus) IsDone

func (r VerificationResultAndStatus) IsDone() bool

func (VerificationResultAndStatus) IsStarted

func (r VerificationResultAndStatus) IsStarted() bool

type Verifier

type Verifier interface {
	// If the Verifier needs to do anything immediately after the DataIterator
	// finishes copying data and before cutover occurs, implement this function.
	VerifyBeforeCutover() error

	// This is called during cutover and should give the result of the
	// verification.
	VerifyDuringCutover() (VerificationResult, error)

	// Start the verifier in the background during the cutover phase.
	// Traditionally, this is called from within the ControlServer.
	//
	// This method maybe called multiple times and it's up to the verifier
	// to decide if it is possible to re-run the verification.
	StartInBackground() error

	// Wait for the verifier until it finishes verification after it was
	// started with the StartInBackground.
	//
	// A verification is "done" when it verified the dbs (either
	// correct or incorrect) OR when it experiences an error.
	Wait()

	// Returns the result and the status of the verification.
	// To check the status, call IsStarted() and IsDone() on
	// VerificationResultAndStatus.
	//
	// If the verification has been completed successfully (without errors) and
	// the data checks out to be "correct", the result will be
	// VerificationResult{true, ""}, with error = nil.
	// Otherwise, the result will be VerificationResult{false, "message"}, with
	// error = nil.
	//
	// If the verification is "done" but experienced an error during the check,
	// the result will be VerificationResult{}, with err = yourErr.
	Result() (VerificationResultAndStatus, error)
}

The sole purpose of this interface is to make it easier for one to implement their own strategy for verification and hook it up with the ControlServer. If there is no such need, one does not need to implement this interface.

type WaitUntilReplicaIsCaughtUpToMaster

type WaitUntilReplicaIsCaughtUpToMaster struct {
	MasterDB                        *sql.DB
	ReplicatedMasterPositionFetcher ReplicatedMasterPositionFetcher
	Timeout                         time.Duration

	ReplicaDB *sql.DB
	// contains filtered or unexported fields
}

Only set the MasterDB and ReplicatedMasterPosition options in your code as the others will be overwritten by the ferry.

func (*WaitUntilReplicaIsCaughtUpToMaster) IsCaughtUp

func (w *WaitUntilReplicaIsCaughtUpToMaster) IsCaughtUp(targetMasterPos mysql.Position, retries int) (bool, error)

func (*WaitUntilReplicaIsCaughtUpToMaster) Wait

type WorkerPool

type WorkerPool struct {
	Concurrency int
	Process     func(int) (interface{}, error)
}

func (*WorkerPool) Run

func (p *WorkerPool) Run(n int) ([]interface{}, error)

Returns a list of results of the size same as the concurrency number. Returns the first error that occurs during the run. Also as soon as a single worker errors, all workers terminates.

Directories

Path Synopsis
cmd command
cmd command
cmd command
test
lib/go command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL