Versions in this module Expand all Collapse all v1 v1.2.0 Feb 6, 2026 Changes in this version + const CompressionSnappy + const DefaultMarginalia + const DefaultNet + const MismatchColumnMissingOnSource + const MismatchColumnMissingOnTarget + const MismatchColumnValueDifference + const MismatchRowChecksumDifference + const MismatchRowMissingOnSource + const MismatchRowMissingOnTarget + const MySQLNumParamsLimit + const StateCopying + const StateCutover + const StateDone + const StateStarting + const StateVerifyBeforeCutover + const StateWaitingForCutover + const TableActionCompleted + const TableActionCopying + const TableActionWaiting + const VerifierTypeChecksumTable + const VerifierTypeInline + const VerifierTypeIterative + const VerifierTypeNoVerification + var VersionString string = "?.?.?+??????????????+???????" + var WebUiBasedir string = "" + func CheckDbIsAReplica(db *sql.DB) (bool, error) + func ConvertTableColumnsToStrings(columns []schema.TableColumn) []string + func DefaultBuildSelect(columns []string, table *TableSchema, lastPaginationKey PaginationKey, ...) squirrel.SelectBuilder + func GetMd5HashesSql(schema, table, paginationKeyColumn string, columns []schema.TableColumn, ...) (string, []interface{}, error) + func Int64Value(value interface{}) (int64, bool) + func MarshalPaginationKey(k PaginationKey) ([]byte, error) + func MaskedDSN(c *mysql.Config) string + func NewMysqlPosition(file string, position uint32, err error, db *sql.DB) (mysql.Position, error) + func NonBinaryCollationError(schema, table, paginationKey, collation string) error + func NonExistingPaginationKeyColumnError(schema, table, paginationKey string) error + func NonExistingPaginationKeyError(schema, table string) error + func NonNumericPaginationKeyError(schema, table, paginationKey string) error + func QuoteField(field string) string + func QuoteFields(fields []string) (out []string) + func QuotedTableName(table *TableSchema) string + func QuotedTableNameFromString(database, table string) string + func ScanByteRow(rows *sqlorig.Rows, columnCount int) ([][]byte, error) + func ShowMasterStatusBinlogPosition(db *sql.DB) (mysql.Position, error) + func TargetToSourceRewrites(databaseRewrites map[string]string) (map[string]string, error) + func Uint64Value(value interface{}) (uint64, bool) + func WaitForThrottle(t Throttler) + func WithRetries(maxRetries int, sleep time.Duration, logger *logrus.Entry, verb string, ...) (err error) + func WithRetriesContext(ctx context.Context, maxRetries int, sleep time.Duration, logger *logrus.Entry, ...) (err error) + type AtomicBoolean int32 + func (a *AtomicBoolean) Get() bool + func (a *AtomicBoolean) Set(b bool) + type BatchWriter struct + DB *sql.DB + DatabaseRewrites map[string]string + EnforceInlineVerification bool + InlineVerifier *InlineVerifier + StateTracker *StateTracker + TableRewrites map[string]string + WriteRetries int + func (w *BatchWriter) Initialize() + func (w *BatchWriter) WriteRowBatch(batch *RowBatch) error + type BatchWriterVerificationFailed struct + func (e BatchWriterVerificationFailed) Error() string + type BinaryKey []byte + func NewBinaryKey(value []byte) BinaryKey + func (k BinaryKey) Compare(other PaginationKey) int + func (k BinaryKey) IsMax() bool + func (k BinaryKey) MarshalJSON() ([]byte, error) + func (k BinaryKey) NumericPosition() float64 + func (k BinaryKey) SQLValue() interface{} + func (k BinaryKey) String() string + type BinlogDeleteEvent struct + func (e *BinlogDeleteEvent) AsSQLString(schemaName, tableName string) (string, error) + func (e *BinlogDeleteEvent) NewValues() RowData + func (e *BinlogDeleteEvent) OldValues() RowData + func (e *BinlogDeleteEvent) PaginationKey() (string, error) + type BinlogEventState struct + type BinlogInsertEvent struct + func (e *BinlogInsertEvent) AsSQLString(schemaName, tableName string) (string, error) + func (e *BinlogInsertEvent) NewValues() RowData + func (e *BinlogInsertEvent) OldValues() RowData + func (e *BinlogInsertEvent) PaginationKey() (string, error) + type BinlogStreamer struct + DB *sql.DB + DBConfig *DatabaseConfig + DatabaseRewrites map[string]string + ErrorHandler ErrorHandler + Filter CopyFilter + LogTag string + MyServerId uint32 + TableRewrites map[string]string + TableSchema TableSchemaCache + func (s *BinlogStreamer) AddBinlogEventHandler(evType replication.EventType, ...) error + func (s *BinlogStreamer) AddEventListener(listener func([]DMLEvent) error) + func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() (mysql.Position, error) + func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPosition mysql.Position) (mysql.Position, error) + func (s *BinlogStreamer) FlushAndStop() + func (s *BinlogStreamer) GetLastStreamedBinlogPosition() mysql.Position + func (s *BinlogStreamer) IsAlmostCaughtUp() bool + func (s *BinlogStreamer) Run() + type BinlogUpdateEvent struct + func (e *BinlogUpdateEvent) AsSQLString(schemaName, tableName string) (string, error) + func (e *BinlogUpdateEvent) NewValues() RowData + func (e *BinlogUpdateEvent) OldValues() RowData + func (e *BinlogUpdateEvent) PaginationKey() (string, error) + type BinlogVerifyBatch struct + PaginationKeys []interface{} + SchemaName string + TableName string + type BinlogVerifySerializedStore map[string]map[string]map[string]int + func (s BinlogVerifySerializedStore) Copy() BinlogVerifySerializedStore + func (s BinlogVerifySerializedStore) EntriesCount() uint64 + func (s BinlogVerifySerializedStore) RowCount() uint64 + type BinlogVerifyStore struct + EmitLogPerRowsAdded uint64 + func NewBinlogVerifyStore() *BinlogVerifyStore + func NewBinlogVerifyStoreFromSerialized(serialized BinlogVerifySerializedStore) *BinlogVerifyStore + func (s *BinlogVerifyStore) Add(table *TableSchema, paginationKey string) + func (s *BinlogVerifyStore) Batches(batchsize int) []BinlogVerifyBatch + func (s *BinlogVerifyStore) CurrentEntriesCount() uint64 + func (s *BinlogVerifyStore) CurrentRowCount() uint64 + func (s *BinlogVerifyStore) RemoveVerifiedBatch(batch BinlogVerifyBatch) + func (s *BinlogVerifyStore) Serialize() BinlogVerifySerializedStore + type BinlogWriter struct + BatchSize int + DB *sql.DB + DatabaseRewrites map[string]string + ErrorHandler ErrorHandler + StateTracker *StateTracker + TableRewrites map[string]string + Throttler Throttler + WriteRetries int + func (b *BinlogWriter) BufferBinlogEvents(events []DMLEvent) error + func (b *BinlogWriter) Run() + func (b *BinlogWriter) Stop() + type CascadingPaginationColumnConfig struct + FallbackColumn string + PerTable map[string]map[string]string + func (c *CascadingPaginationColumnConfig) FallbackPaginationColumnName() (string, bool) + func (c *CascadingPaginationColumnConfig) PaginationColumnFor(schemaName, tableName string) (string, bool) + type ChecksumTableVerifier struct + DatabaseRewrites map[string]string + SourceDB *sql.DB + TableRewrites map[string]string + Tables []*TableSchema + TargetDB *sql.DB + func (v *ChecksumTableVerifier) Message() string + func (v *ChecksumTableVerifier) Result() (VerificationResultAndStatus, error) + func (v *ChecksumTableVerifier) StartInBackground() error + func (v *ChecksumTableVerifier) VerifyBeforeCutover() error + func (v *ChecksumTableVerifier) VerifyDuringCutover() (VerificationResult, error) + func (v *ChecksumTableVerifier) Wait() + type ColumnCompressionConfig map[string]map[string]map[string]string + func (c ColumnCompressionConfig) CompressedColumnsFor(schemaName, tableName string) map[string]string + type ColumnIgnoreConfig map[string]map[string]map[string]struct + func (c ColumnIgnoreConfig) IgnoredColumnsFor(schemaName, tableName string) map[string]struct{} + type CompressionVerifier struct + TableSchemaCache TableSchemaCache + func NewCompressionVerifier(tableColumnCompressions TableColumnCompressionConfig, ...) (*CompressionVerifier, error) + func (c *CompressionVerifier) Decompress(table, column, algorithm string, compressed []byte) ([]byte, error) + func (c *CompressionVerifier) GetCompressedHashes(db *sql.DB, schemaName, tableName, paginationKeyColumn string, ...) (map[string][]byte, error) + func (c *CompressionVerifier) HashRow(decompressedRowData [][]byte) ([]byte, error) + func (c *CompressionVerifier) IsCompressedTable(table string) bool + type Config struct + AllowSuperUserOnReadOnly bool + AutomaticCutover bool + BinlogEventBatchSize int + CascadingPaginationColumnConfig *CascadingPaginationColumnConfig + CompressedColumnsForVerification ColumnCompressionConfig + ControlServerConfig *ControlServerConfig + ControlServerCustomScripts map[string][]string + CopyFilter CopyFilter + CutoverLock HTTPCallback + CutoverRetryWaitSeconds int + CutoverUnlock HTTPCallback + DBReadRetries int + DBWriteRetries int + DataIterationBatchSize uint64 + DataIterationBatchSizePerTableOverride *DataIterationBatchSizePerTableOverride + DataIterationConcurrency int + DatabaseRewrites map[string]string + DoNotIncludeSchemaCacheInStateDump bool + DumpStateOnSignal bool + DumpStateToStdoutOnError bool + EnablePProf bool + EnableRowBatchSize bool + ErrorCallback HTTPCallback + ForceIndexForVerification ForceIndexConfig + IgnoredColumnsForVerification ColumnIgnoreConfig + InlineVerifierConfig InlineVerifierConfig + IterativeVerifierConfig IterativeVerifierConfig + MaxCutoverRetries int + MyServerId uint32 + ProgressCallback HTTPCallback + ProgressReportFrequency int + ServerBindAddr string + SkipBinlogRowImageCheck bool + SkipForeignKeyConstraintsCheck bool + SkipTargetVerification bool + Source *DatabaseConfig + StateCallback HTTPCallback + StateReportFrequency int + StateToResumeFrom *SerializableState + TableFilter TableFilter + TableRewrites map[string]string + Target *DatabaseConfig + UpdatableConfig UpdatableConfig + VerifierType string + WebBasedir string + func (c *Config) Update(updatedConfig UpdatableConfig) + func (c *Config) ValidateConfig() error + type ControlServer struct + Config *ControlServerConfig + F *Ferry + Verifier Verifier + func (this *ControlServer) HandleConfigGet(w http.ResponseWriter, r *http.Request) + func (this *ControlServer) HandleConfigPost(w http.ResponseWriter, r *http.Request) + func (this *ControlServer) HandleCutover(w http.ResponseWriter, r *http.Request) + func (this *ControlServer) HandleIndex(w http.ResponseWriter, r *http.Request) + func (this *ControlServer) HandlePause(w http.ResponseWriter, r *http.Request) + func (this *ControlServer) HandleScript(w http.ResponseWriter, r *http.Request) + func (this *ControlServer) HandleStatus(w http.ResponseWriter, r *http.Request) + func (this *ControlServer) HandleStop(w http.ResponseWriter, r *http.Request) + func (this *ControlServer) HandleUnpause(w http.ResponseWriter, r *http.Request) + func (this *ControlServer) HandleVerify(w http.ResponseWriter, r *http.Request) + func (this *ControlServer) Initialize() (err error) + func (this *ControlServer) Run() + func (this *ControlServer) ServeHTTP(w http.ResponseWriter, r *http.Request) + func (this *ControlServer) Wait() + type ControlServerConfig struct + CustomScripts map[string][]string + Enabled bool + ServerBindAddr string + WebBasedir string + func (c *ControlServerConfig) Validate() error + type ControlServerStatus struct + AllDatabaseNames []string + AllTableNames []string + AutomaticCutover bool + BinlogStreamerLag time.Duration + BinlogStreamerStopRequested bool + CompletedTableCount int + CurrentTime time.Time + CustomScriptStatuses map[string]CustomScriptStatus + ETA time.Duration + GhostferryVersion string + OverallState string + SourceHostPort string + StartTime time.Time + TableStatuses []*ControlServerTableStatus + TargetHostPort string + TimeTaken time.Duration + TotalTableCount int + VerificationDone bool + VerificationErr error + VerificationResult VerificationResult + VerificationStarted bool + VerifierAvailable bool + VerifierSupport bool + type ControlServerTableStatus struct + BatchSize uint64 + LastSuccessfulPaginationKey uint64 + PaginationKeyName string + Status string + TableName string + TargetPaginationKey uint64 + type CopyFilter interface + ApplicableEvent func(DMLEvent) (bool, error) + BuildSelect func([]string, *TableSchema, PaginationKey, uint64) (sq.SelectBuilder, error) + type CountMetric struct + Value int64 + type Cursor struct + MaxPaginationKey PaginationKey + RowLock bool + Table *TableSchema + func (c *Cursor) Each(f func(*RowBatch) error) error + func (c *Cursor) Fetch(db SqlPreparer) (batch *RowBatch, paginationKeypos PaginationKey, err error) + type CursorConfig struct + BatchSize *uint64 + BatchSizePerTableOverride *DataIterationBatchSizePerTableOverride + BuildSelect func([]string, *TableSchema, PaginationKey, uint64) (squirrel.SelectBuilder, error) + ColumnsToSelect []string + DB *sql.DB + ReadRetries int + Throttler Throttler + func (c *CursorConfig) NewCursor(table *TableSchema, startPaginationKey, maxPaginationKey PaginationKey) *Cursor + func (c *CursorConfig) NewCursorWithoutRowLock(table *TableSchema, startPaginationKey, maxPaginationKey PaginationKey) *Cursor + func (c CursorConfig) GetBatchSize(schemaName string, tableName string) uint64 + type CustomScriptStatus struct + ExitCode int + Logs string + Name string + Running bool + Status string + type DMLEvent interface + Annotation func() (string, error) + AsSQLString func(string, string) (string, error) + BinlogPosition func() mysql.Position + Database func() string + NewValues func() RowData + OldValues func() RowData + PaginationKey func() (string, error) + ResumableBinlogPosition func() mysql.Position + Table func() string + TableSchema func() *TableSchema + Timestamp func() time.Time + func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, ...) ([]DMLEvent, error) + func NewBinlogDeleteEvents(eventBase *DMLEventBase, rowsEvent *replication.RowsEvent) ([]DMLEvent, error) + func NewBinlogInsertEvents(eventBase *DMLEventBase, rowsEvent *replication.RowsEvent) ([]DMLEvent, error) + func NewBinlogUpdateEvents(eventBase *DMLEventBase, rowsEvent *replication.RowsEvent) ([]DMLEvent, error) + type DMLEventBase struct + func NewDMLEventBase(table *TableSchema, pos, resumablePos mysql.Position, query []byte, ...) *DMLEventBase + func (e *DMLEventBase) Annotation() (string, error) + func (e *DMLEventBase) BinlogPosition() mysql.Position + func (e *DMLEventBase) Database() string + func (e *DMLEventBase) ResumableBinlogPosition() mysql.Position + func (e *DMLEventBase) Table() string + func (e *DMLEventBase) TableSchema() *TableSchema + func (e *DMLEventBase) Timestamp() time.Time + type DataIterationBatchSizePerTableOverride struct + ControlPoints map[int]uint64 + MaxRowSize int + MinRowSize int + TableOverride map[string]map[string]uint64 + func (d *DataIterationBatchSizePerTableOverride) CalculateBatchSize(rowSize int) int + func (d *DataIterationBatchSizePerTableOverride) UpdateBatchSizes(db *sql.DB, tables TableSchemaCache) error + func (d *DataIterationBatchSizePerTableOverride) Validate() error + type DataIterator struct + Concurrency int + CursorConfig *CursorConfig + DB *sql.DB + ErrorHandler ErrorHandler + SelectFingerprint bool + StateTracker *StateTracker + TableSorter DataIteratorSorter + TargetPaginationKeys *sync.Map + func (d *DataIterator) AddBatchListener(listener func(*RowBatch) error) + func (d *DataIterator) AddDoneListener(listener func() error) + func (d *DataIterator) Run(tables []*TableSchema) + type DataIteratorSorter interface + Sort func(unorderedTables map[*TableSchema]PaginationKey) ([]TableMaxPaginationKey, error) + type DatabaseConfig struct + Collation string + Host string + Marginalia string + Net string + Params map[string]string + Pass string + Port uint16 + ReadTimeout uint64 + TLS *TLSConfig + User string + WriteTimeout uint64 + func (c *DatabaseConfig) MySQLConfig() (*mysql.Config, error) + func (c *DatabaseConfig) SqlDB(logger *logrus.Entry) (*sql.DB, error) + func (c *DatabaseConfig) Validate() error + type ErrorHandler interface + Fatal func(from string, err error) + ReportError func(from string, err error) + type Ferry struct + BatchWriter *BatchWriter + BinlogStreamer *BinlogStreamer + BinlogWriter *BinlogWriter + ControlServer *ControlServer + DataIterator *DataIterator + DoneTime time.Time + ErrorHandler ErrorHandler + OverallState atomic.Value + SourceDB *sql.DB + StartTime time.Time + StateTracker *StateTracker + Tables TableSchemaCache + TargetDB *sql.DB + TargetVerifier *TargetVerifier + Throttler Throttler + Verifier Verifier + WaitUntilReplicaIsCaughtUpToMaster *WaitUntilReplicaIsCaughtUpToMaster + func (f *Ferry) EndCutover(cutoverStart time.Time) + func (f *Ferry) FlushBinlogAndStopStreaming() + func (f *Ferry) Initialize() (err error) + func (f *Ferry) NewBatchWriter() *BatchWriter + func (f *Ferry) NewBatchWriterWithoutStateTracker() *BatchWriter + func (f *Ferry) NewBinlogWriter() *BinlogWriter + func (f *Ferry) NewBinlogWriterWithoutStateTracker() *BinlogWriter + func (f *Ferry) NewChecksumTableVerifier() *ChecksumTableVerifier + func (f *Ferry) NewControlServer() (*ControlServer, error) + func (f *Ferry) NewDataIterator() *DataIterator + func (f *Ferry) NewDataIteratorWithoutStateTracker() *DataIterator + func (f *Ferry) NewInlineVerifier() *InlineVerifier + func (f *Ferry) NewInlineVerifierWithoutStateTracker() *InlineVerifier + func (f *Ferry) NewIterativeVerifier() (*IterativeVerifier, error) + func (f *Ferry) NewSourceBinlogStreamer() *BinlogStreamer + func (f *Ferry) NewTargetBinlogStreamer() (*BinlogStreamer, error) + func (f *Ferry) Progress() *Progress + func (f *Ferry) ReportProgress() + func (f *Ferry) ReportState() + func (f *Ferry) Run() + func (f *Ferry) RunStandaloneDataCopy(tables []*TableSchema) error + func (f *Ferry) SerializeStateToJSON() (string, error) + func (f *Ferry) Start() error + func (f *Ferry) StartCutover() time.Time + func (f *Ferry) StopTargetVerifier() + func (f *Ferry) WaitUntilBinlogStreamerCatchesUp() + func (f *Ferry) WaitUntilRowCopyIsComplete() + type ForceIndexConfig map[string]map[string]string + func (c ForceIndexConfig) IndexFor(schemaName, tableName string) string + type GaugeMetric struct + Value float64 + type HTTPCallback struct + Payload string + URI string + func (h HTTPCallback) Post(client *http.Client) error + type IncompleteVerificationError struct + func (e IncompleteVerificationError) Error() string + type InlineVerifier struct + BatchSize int + DatabaseRewrites map[string]string + ErrorHandler ErrorHandler + MaxExpectedDowntime time.Duration + SourceDB *sql.DB + StateTracker *StateTracker + TableRewrites map[string]string + TableSchemaCache TableSchemaCache + TargetDB *sql.DB + VerifyBinlogEventsInterval time.Duration + func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, targetTable string, sourceBatch *RowBatch, ...) ([]InlineVerifierMismatches, error) + func (v *InlineVerifier) Message() string + func (v *InlineVerifier) PeriodicallyVerifyBinlogEvents(ctx context.Context) + func (v *InlineVerifier) Result() (VerificationResultAndStatus, error) + func (v *InlineVerifier) StartInBackground() error + func (v *InlineVerifier) VerifyBeforeCutover() error + func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error) + func (v *InlineVerifier) Wait() + type InlineVerifierConfig struct + MaxExpectedDowntime string + VerifyBinlogEventsInterval string + func (c *InlineVerifierConfig) Validate() error + type InlineVerifierMismatches struct + MismatchColumn string + MismatchType mismatchType + Pk string + SourceChecksum string + TargetChecksum string + type IterativeVerifier struct + BinlogStreamer *BinlogStreamer + CompressionVerifier *CompressionVerifier + Concurrency int + CursorConfig *CursorConfig + DatabaseRewrites map[string]string + IgnoredColumns map[string]map[string]struct{} + IgnoredTables []string + MaxExpectedDowntime time.Duration + SourceDB *sql.DB + TableRewrites map[string]string + TableSchemaCache TableSchemaCache + Tables []*TableSchema + TargetDB *sql.DB + func (v *IterativeVerifier) GetHashes(db *sql.DB, schemaName, tableName, paginationKeyColumn string, ...) (map[string][]byte, error) + func (v *IterativeVerifier) Initialize() error + func (v *IterativeVerifier) Message() string + func (v *IterativeVerifier) Result() (VerificationResultAndStatus, error) + func (v *IterativeVerifier) SanityCheckParameters() error + func (v *IterativeVerifier) StartInBackground() error + func (v *IterativeVerifier) VerifyBeforeCutover() error + func (v *IterativeVerifier) VerifyDuringCutover() (VerificationResult, error) + func (v *IterativeVerifier) VerifyOnce() (VerificationResult, error) + func (v *IterativeVerifier) Wait() + type IterativeVerifierConfig struct + Concurrency int + IgnoredColumns map[string][]string + IgnoredTables []string + MaxExpectedDowntime string + TableColumnCompression TableColumnCompressionConfig + func (c *IterativeVerifierConfig) Validate() error + type LagThrottler struct + DB *sql.DB + func NewLagThrottler(config *LagThrottlerConfig) (*LagThrottler, error) + func (t *LagThrottler) Run(ctx context.Context) error + func (t *LagThrottler) Throttled() bool + type LagThrottlerConfig struct + Connection *DatabaseConfig + MaxLag int + Query string + UpdateInterval string + type MaxPaginationKeySorter struct + func (s *MaxPaginationKeySorter) Sort(unorderedTables map[*TableSchema]PaginationKey) ([]TableMaxPaginationKey, error) + type MaxTableSizeSorter struct + DataIterator *DataIterator + func (s *MaxTableSizeSorter) Sort(unorderedTables map[*TableSchema]PaginationKey) ([]TableMaxPaginationKey, error) + type MetricBase struct + Key string + SampleRate float64 + Tags []MetricTag + type MetricTag struct + Name string + Value string + type Metrics struct + DefaultTags []MetricTag + Prefix string + Sink chan interface{} + func SetGlobalMetrics(prefix string, sink chan interface{}) *Metrics + func (m *Metrics) AddConsumer() + func (m *Metrics) Count(key string, value int64, tags []MetricTag, sampleRate float64) + func (m *Metrics) DoneConsumer() + func (m *Metrics) Gauge(key string, value float64, tags []MetricTag, sampleRate float64) + func (m *Metrics) Measure(key string, tags []MetricTag, sampleRate float64, f func()) + func (m *Metrics) StopAndFlush() + func (m *Metrics) Timer(key string, duration time.Duration, tags []MetricTag, sampleRate float64) + type PaginationKey interface + Compare func(other PaginationKey) int + IsMax func() bool + MarshalJSON func() ([]byte, error) + NumericPosition func() float64 + SQLValue func() interface{} + String func() string + func MaxPaginationKey(column *schema.TableColumn) PaginationKey + func MinPaginationKey(column *schema.TableColumn) PaginationKey + func NewPaginationKeyFromRow(rowData RowData, index int, column *schema.TableColumn) (PaginationKey, error) + func UnmarshalPaginationKey(data []byte) (PaginationKey, error) + type PaginationKeyPositionLog struct + At time.Time + Position float64 + type PanicErrorHandler struct + DumpStateToStdoutOnError bool + ErrorCallback HTTPCallback + Ferry *Ferry + func (this *PanicErrorHandler) Fatal(from string, err error) + func (this *PanicErrorHandler) ReportError(from string, err error) + type PauserThrottler struct + func (t *PauserThrottler) Run(ctx context.Context) error + func (t *PauserThrottler) SetPaused(paused bool) + func (t *PauserThrottler) Throttled() bool + type Progress struct + ActiveDataIterators int + BinlogStreamerLag float64 + BinlogWriterLag float64 + CurrentState string + CustomPayload string + ETA float64 + FinalBinlogPos mysql.Position + LastSuccessfulBinlogPos mysql.Position + PaginationKeysPerSecond uint64 + Tables map[string]TableProgress + TargetBinlogStreamerLag float64 + Throttled bool + TimeTaken float64 + VerifierMessage string + VerifierType string + type ReplicatedMasterPositionFetcher interface + Current func(*sql.DB) (mysql.Position, error) + type ReplicatedMasterPositionViaCustomQuery struct + Query string + func (r ReplicatedMasterPositionViaCustomQuery) Current(replicaDB *sql.DB) (mysql.Position, error) + type ReverifyBatch struct + PaginationKeys []interface{} + Table TableIdentifier + type ReverifyEntry struct + PaginationKey string + Table *TableSchema + type ReverifyStore struct + BatchStore []ReverifyBatch + EmitLogPerRowCount uint64 + MapStore map[TableIdentifier]map[string]struct{} + RowCount uint64 + func NewReverifyStore() *ReverifyStore + func (r *ReverifyStore) Add(entry ReverifyEntry) + func (r *ReverifyStore) FlushAndBatchByTable(batchsize int) []ReverifyBatch + type RowBatch struct + func NewRowBatch(table *TableSchema, values []RowData, paginationKeyIndex int) *RowBatch + func (e *RowBatch) AsSQLQuery(schemaName, tableName string) (string, []interface{}, error) + func (e *RowBatch) EstimateByteSize() uint64 + func (e *RowBatch) Fingerprints() map[string][]byte + func (e *RowBatch) PaginationKeyIndex() int + func (e *RowBatch) Size() int + func (e *RowBatch) TableSchema() *TableSchema + func (e *RowBatch) Values() []RowData + func (e *RowBatch) ValuesContainPaginationKey() bool + type RowData []interface + func (r RowData) GetUint64(colIdx int) (uint64, error) + func ScanGenericRow(rows *sqlorig.Rows, columnCount int) (RowData, error) + type RowStats struct + NumBytes uint64 + NumRows uint64 + type SerializableState struct + BinlogVerifyStore BinlogVerifySerializedStore + CompletedTables map[string]bool + GhostferryVersion string + LastKnownTableSchemaCache TableSchemaCache + LastStoredBinlogPositionForInlineVerifier mysql.Position + LastStoredBinlogPositionForTargetVerifier mysql.Position + LastSuccessfulPaginationKeys map[string]PaginationKey + LastWrittenBinlogPosition mysql.Position + func (s *SerializableState) MarshalJSON() ([]byte, error) + func (s *SerializableState) MinSourceBinlogPosition() mysql.Position + func (s *SerializableState) UnmarshalJSON(data []byte) error + type SqlDBWithFakeRollback struct + func (d *SqlDBWithFakeRollback) Rollback() error + type SqlPreparer interface + Prepare func(string) (*sqlorig.Stmt, error) + type SqlPreparerAndRollbacker interface + Rollback func() error + type StateTracker struct + BinlogRWMutex *sync.RWMutex + CopyRWMutex *sync.RWMutex + func NewStateTracker(speedLogCount int) *StateTracker + func NewStateTrackerFromSerializedState(speedLogCount int, serializedState *SerializableState) *StateTracker + func (s *StateTracker) EstimatedPaginationKeysPerSecond() float64 + func (s *StateTracker) IsTableComplete(table string) bool + func (s *StateTracker) LastSuccessfulPaginationKey(table string, tableSchema *TableSchema) PaginationKey + func (s *StateTracker) MarkTableAsCompleted(table string) + func (s *StateTracker) RowStatsWrittenPerTable() map[string]RowStats + func (s *StateTracker) Serialize(lastKnownTableSchemaCache TableSchemaCache, ...) *SerializableState + func (s *StateTracker) UpdateLastResumableBinlogPositionForTargetVerifier(pos mysql.Position) + func (s *StateTracker) UpdateLastResumableSourceBinlogPosition(pos mysql.Position) + func (s *StateTracker) UpdateLastResumableSourceBinlogPositionForInlineVerifier(pos mysql.Position) + func (s *StateTracker) UpdateLastSuccessfulPaginationKey(table string, paginationKey PaginationKey, rowStats RowStats) + type StmtCache struct + func NewStmtCache() *StmtCache + func (c *StmtCache) StmtFor(p SqlPreparer, query string) (*sqlorig.Stmt, error) + type TLSConfig struct + CertPath string + ServerName string + func (this *TLSConfig) BuildConfig() (*tls.Config, error) + type TableColumnCompressionConfig map[string]map[string]string + type TableFilter interface + ApplicableDatabases func([]string) ([]string, error) + ApplicableTables func([]*TableSchema) ([]*TableSchema, error) + type TableIdentifier struct + SchemaName string + TableName string + func NewTableIdentifierFromSchemaTable(table *TableSchema) TableIdentifier + type TableMaxPaginationKey struct + MaxPaginationKey PaginationKey + Table *TableSchema + type TableProgress struct + BatchSize uint64 + BytesWritten uint64 + CurrentAction string + LastSuccessfulPaginationKey uint64 + RowsWritten uint64 + TargetPaginationKey uint64 + type TableSchema struct + CompressedColumnsForVerification map[string]string + ForcedIndexForVerification string + IgnoredColumnsForVerification map[string]struct{} + PaginationKeyColumn *schema.TableColumn + PaginationKeyIndex int + func MaxPaginationKeys(db *sql.DB, tables []*TableSchema, logger *logrus.Entry) (map[*TableSchema]PaginationKey, []*TableSchema, error) + func (t *TableSchema) FingerprintQuery(schemaName, tableName string, numRows int) string + func (t *TableSchema) GetPaginationColumn() *schema.TableColumn + func (t *TableSchema) GetPaginationKeyIndex() int + func (t *TableSchema) RowMd5Query() string + type TableSchemaCache map[string]*TableSchema + func LoadTables(db *sql.DB, tableFilter TableFilter, ...) (TableSchemaCache, error) + func (c TableSchemaCache) AllTableNames() (tableNames []string) + func (c TableSchemaCache) AsSlice() (tables []*TableSchema) + func (c TableSchemaCache) Get(database, table string) *TableSchema + func (c TableSchemaCache) GetTableListWithPriority(priorityList []string) (prioritizedTableNames []string) + type TargetVerifier struct + BinlogStreamer *BinlogStreamer + DB *sql.DB + StateTracker *StateTracker + func NewTargetVerifier(targetDB *sql.DB, stateTracker *StateTracker, binlogStreamer *BinlogStreamer) (*TargetVerifier, error) + func (t *TargetVerifier) BinlogEventListener(evs []DMLEvent) error + type Throttler interface + Disabled func() bool + Run func(context.Context) error + SetDisabled func(bool) + SetPaused func(bool) + Throttled func() bool + type ThrottlerBase struct + func (t *ThrottlerBase) Disabled() bool + func (t *ThrottlerBase) SetDisabled(disabled bool) + type TimerMetric struct + Value time.Duration + type Uint64Key uint64 + func NewUint64Key(value uint64) Uint64Key + func (k Uint64Key) Compare(other PaginationKey) int + func (k Uint64Key) IsMax() bool + func (k Uint64Key) MarshalJSON() ([]byte, error) + func (k Uint64Key) NumericPosition() float64 + func (k Uint64Key) SQLValue() interface{} + func (k Uint64Key) String() string + type UnsupportedCompressionError struct + func (e UnsupportedCompressionError) Error() string + type UpdatableConfig struct + DataIterationBatchSize uint64 + func (c *UpdatableConfig) Validate() error + type VerificationResult struct + DataCorrect bool + IncorrectTables []string + Message string + func NewCorrectVerificationResult() VerificationResult + func (e VerificationResult) Error() string + type VerificationResultAndStatus struct + DoneTime time.Time + StartTime time.Time + func (r VerificationResultAndStatus) IsDone() bool + func (r VerificationResultAndStatus) IsStarted() bool + type Verifier interface + Message func() string + Result func() (VerificationResultAndStatus, error) + StartInBackground func() error + VerifyBeforeCutover func() error + VerifyDuringCutover func() (VerificationResult, error) + Wait func() + type WaitUntilReplicaIsCaughtUpToMaster struct + MasterDB *sql.DB + ReplicaDB *sql.DB + ReplicatedMasterPositionFetcher ReplicatedMasterPositionFetcher + Timeout time.Duration + func (w *WaitUntilReplicaIsCaughtUpToMaster) IsCaughtUp(targetMasterPos mysql.Position, retries int) (bool, error) + func (w *WaitUntilReplicaIsCaughtUpToMaster) Wait() error + type WorkerPool struct + Concurrency int + Process func(int) (interface{}, error) + func (p *WorkerPool) Run(n int) ([]interface{}, error)