logic

package
v1.1.8-rc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2025 License: MIT Imports: 34 Imported by: 28

Documentation

Index

Constants

View Source
const (
	NoPrintStatusRule           PrintStatusRule = iota
	HeuristicPrintStatusRule                    = iota
	ForcePrintStatusRule                        = iota
	ForcePrintStatusOnlyRule                    = iota
	ForcePrintStatusAndHintRule                 = iota
)
View Source
const (
	EventsChannelBufferSize       = 1
	ReconnectStreamerSleepSeconds = 1
)
View Source
const (
	GhostChangelogTableComment = "gh-ost changelog"
)

Variables

View Source
var (
	ErrMigratorUnsupportedRenameAlter = errors.New("ALTER statement seems to RENAME the table. This is not supported, and you should run your RENAME outside gh-ost.")
	ErrMigrationNotAllowedOnMaster    = errors.New("It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (this reduces load from the master). To proceed please provide --allow-on-master.")
	RetrySleepFn                      = time.Sleep
)
View Source
var (
	ErrCPUProfilingBadOption  = errors.New("unrecognized cpu profiling option")
	ErrCPUProfilingInProgress = errors.New("cpu profiling already in progress")
)
View Source
var ErrNoCheckpointFound = errors.New("no checkpoint found in _ghk table")

ErrNoCheckpointFound is returned when an empty checkpoint table is queried.

Functions

This section is empty.

Types

type Applier

type Applier struct {
	CurrentCoordinatesMutex sync.Mutex
	CurrentCoordinates      mysql.BinlogCoordinates

	LastIterationRangeMutex     sync.Mutex
	LastIterationRangeMinValues *sql.ColumnValues
	LastIterationRangeMaxValues *sql.ColumnValues
	// contains filtered or unexported fields
}

Applier connects and writes the applier-server, which is the server where migration happens. This is typically the master, but could be a replica when `--test-on-replica` or `--execute-on-replica` are given. Applier is the one to actually write row data and apply binlog events onto the ghost table. It is where the ghost & changelog tables get created. It is where the cut-over phase happens.

func NewApplier

func NewApplier(migrationContext *base.MigrationContext) *Applier

func (*Applier) AlterGhost

func (this *Applier) AlterGhost() error

AlterGhost applies `alter` statement on ghost table

func (*Applier) AlterGhostAutoIncrement added in v1.1.2

func (this *Applier) AlterGhostAutoIncrement() error

AlterGhost applies `alter` statement on ghost table

func (*Applier) ApplyDMLEventQueries added in v1.0.34

func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error

ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table

func (*Applier) ApplyIterationInsertQuery

func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error)

ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where data actually gets copied from original table.

func (*Applier) AtomicCutOverMagicLock added in v0.9.8

func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error, renameLockSessionId *int64) error

AtomicCutOverMagicLock

func (*Applier) AtomicCutoverRename added in v0.9.8

func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error

AtomicCutoverRename

func (*Applier) AttemptInstantDDL added in v1.1.6

func (this *Applier) AttemptInstantDDL() error

AttemptInstantDDL attempts to use instant DDL (from MySQL 8.0, and earlier in Aurora and some others). If successful, the operation is only a meta-data change so a lot of time is saved! The risk of attempting to instant DDL when not supported is that a metadata lock may be acquired. This is minor, since gh-ost will eventually require a metadata lock anyway, but at the cut-over stage. Instant operations include: - Adding a column - Dropping a column - Dropping an index - Extending a VARCHAR column - Adding a virtual generated column It is not reliable to parse the `alter` statement to determine if it is instant or not. This is because the table might be in an older row format, or have some other incompatibility that is difficult to identify.

func (*Applier) CalculateNextIterationRangeEndValues

func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error)

CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values, which will be used for copying the next chunk of rows. Ir returns "false" if there is no further chunk to work through, i.e. we're past the last chunk and are done with iterating the range (and thus done with copying row chunks)

func (*Applier) CreateAtomicCutOverSentryTable added in v0.9.8

func (this *Applier) CreateAtomicCutOverSentryTable() error

CreateAtomicCutOverSentryTable

func (*Applier) CreateChangelogTable

func (this *Applier) CreateChangelogTable() error

CreateChangelogTable creates the changelog table on the applier host

func (*Applier) CreateCheckpointTable

func (this *Applier) CreateCheckpointTable() error

Create the checkpoint table to store the chunk copy and applier state. There are two sets of columns with the same types as the shared unique key, one for IterationMinValues and one for IterationMaxValues.

func (*Applier) CreateGhostTable

func (this *Applier) CreateGhostTable() error

CreateGhostTable creates the ghost table on the applier host

func (*Applier) CreateTriggersOnGhost

func (this *Applier) CreateTriggersOnGhost() error

CreateTriggers creates the original triggers on applier host

func (*Applier) DropAtomicCutOverSentryTableIfExists added in v0.9.8

func (this *Applier) DropAtomicCutOverSentryTableIfExists() error

DropAtomicCutOverSentryTableIfExists checks if the "old" table name happens to be a cut-over magic table; if so, it drops it.

func (*Applier) DropChangelogTable

func (this *Applier) DropChangelogTable() error

DropChangelogTable drops the changelog table on the applier host

func (*Applier) DropCheckpointTable

func (this *Applier) DropCheckpointTable() error

DropCheckpointTable drops the checkpoint table on applier host

func (*Applier) DropGhostTable

func (this *Applier) DropGhostTable() error

DropGhostTable drops the ghost table on the applier host

func (*Applier) DropOldTable

func (this *Applier) DropOldTable() error

DropOldTable drops the _Old table on the applier host

func (*Applier) DropTriggersFromGhost

func (this *Applier) DropTriggersFromGhost() error

dropTriggers drop the triggers on the applied host

func (*Applier) ExecuteThrottleQuery added in v0.9.6

func (this *Applier) ExecuteThrottleQuery() (int64, error)

ExecuteThrottleQuery executes the `--throttle-query` and returns its results.

func (*Applier) ExpectMetadataLock

func (this *Applier) ExpectMetadataLock(sessionId int64) error

func (*Applier) ExpectProcess

func (this *Applier) ExpectProcess(sessionId int64, stateHint, infoHint string) error

ExpectProcess expects a process to show up in `SHOW PROCESSLIST` that has given characteristics

func (*Applier) ExpectUsedLock added in v0.9.2

func (this *Applier) ExpectUsedLock(sessionId int64) error

ExpectUsedLock expects the special hint voluntary lock to exist on given session

func (*Applier) GetSessionLockName added in v0.9.2

func (this *Applier) GetSessionLockName(sessionId int64) string

GetSessionLockName returns a name for the special hint session voluntary lock

func (*Applier) InitAtomicCutOverWaitTimeout added in v1.1.7

func (this *Applier) InitAtomicCutOverWaitTimeout(tx *gosql.Tx) error

InitAtomicCutOverWaitTimeout sets the cut-over session wait_timeout in order to reduce the time an unresponsive (but still connected) gh-ost process can hold the cut-over lock.

func (*Applier) InitDBConnections

func (this *Applier) InitDBConnections() (err error)

func (*Applier) InitiateHeartbeat

func (this *Applier) InitiateHeartbeat()

InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table. This is done asynchronously

func (*Applier) LockOriginalTable added in v0.9.2

func (this *Applier) LockOriginalTable() error

LockOriginalTable places a write lock on the original table

func (*Applier) ReadLastCheckpoint

func (this *Applier) ReadLastCheckpoint() (*Checkpoint, error)

func (*Applier) ReadMigrationRangeValues

func (this *Applier) ReadMigrationRangeValues() error

ReadMigrationRangeValues reads min/max values that will be used for rowcopy. Before read min/max, write a changelog state into the ghc table to avoid lost data in mysql two-phase commit.

Detail description of the lost data in mysql two-phase commit issue by @Fanduzi:

When using semi-sync and setting rpl_semi_sync_master_wait_point=AFTER_SYNC,
if an INSERT statement is being committed but blocks due to an unmet ack count,
the data inserted by the transaction is not visible to ReadMigrationRangeValues,
so the copy of the existing data in the table does not include the new row inserted by the transaction.
However, the binlog event for the transaction is already written to the binlog,
so the addDMLEventsListener only captures the binlog event after the transaction,
and thus the transaction's binlog event is not captured, resulting in data loss.

If write a changelog into ghc table before ReadMigrationRangeValues, and the transaction commit blocks
because the ack is not met, then the changelog will not be able to write, so the ReadMigrationRangeValues
will not be run. When the changelog writes successfully, the ReadMigrationRangeValues will read the
newly inserted data, thus Avoiding data loss due to the above problem.

func (*Applier) RenameTablesRollback added in v0.9.2

func (this *Applier) RenameTablesRollback() (renameError error)

RenameTablesRollback renames back both table: original back to ghost, _old back to original. This is used by `--test-on-replica`

func (*Applier) RevertAtomicCutOverWaitTimeout added in v1.1.7

func (this *Applier) RevertAtomicCutOverWaitTimeout()

RevertAtomicCutOverWaitTimeout restores the original wait_timeout for the applier session post-cut-over.

func (*Applier) ShowStatusVariable

func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error)

func (*Applier) StartReplication added in v1.0.35

func (this *Applier) StartReplication() error

StartReplication is used by `--test-on-replica` on cut-over failure

func (*Applier) StartSlaveIOThread added in v1.0.35

func (this *Applier) StartSlaveIOThread() error

StartSlaveIOThread is applicable with --test-on-replica

func (*Applier) StartSlaveSQLThread added in v0.7.16

func (this *Applier) StartSlaveSQLThread() error

StartSlaveSQLThread is applicable with --test-on-replica

func (*Applier) StateMetadataLockInstrument

func (this *Applier) StateMetadataLockInstrument() error

func (*Applier) StopReplication added in v0.9.6

func (this *Applier) StopReplication() error

StopReplication is used by `--test-on-replica` and stops replication.

func (*Applier) StopSlaveIOThread

func (this *Applier) StopSlaveIOThread() error

StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread, duh. We need to keep the SQL thread active so as to complete processing received events, and have them written to the binary log, so that we can then read them via streamer.

func (*Applier) StopSlaveSQLThread added in v0.9.2

func (this *Applier) StopSlaveSQLThread() error

StopSlaveSQLThread is applicable with --test-on-replica

func (*Applier) SwapTablesQuickAndBumpy

func (this *Applier) SwapTablesQuickAndBumpy() error

SwapTablesQuickAndBumpy issues a two-step swap table operation: - rename original table to _old - rename ghost table to original There is a point in time in between where the table does not exist.

func (*Applier) Teardown added in v1.0.44

func (this *Applier) Teardown()

func (*Applier) UnlockTables

func (this *Applier) UnlockTables() error

UnlockTables makes tea. No wait, it unlocks tables.

func (*Applier) ValidateOrDropExistingTables

func (this *Applier) ValidateOrDropExistingTables() error

ValidateOrDropExistingTables verifies ghost and changelog tables do not exist, or attempts to drop them if instructed to.

func (*Applier) WriteAndLogChangelog

func (this *Applier) WriteAndLogChangelog(hint, value string) (string, error)

func (*Applier) WriteChangelog

func (this *Applier) WriteChangelog(hint, value string) (string, error)

WriteChangelog writes a value to the changelog table. It returns the hint as given, for convenience

func (*Applier) WriteChangelogState

func (this *Applier) WriteChangelogState(value string) (string, error)

func (*Applier) WriteCheckpoint

func (this *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error)

WriteCheckpoints writes a checkpoint to the _ghk table.

type BinlogEventListener

type BinlogEventListener struct {
	// contains filtered or unexported fields
}

type ChangelogState

type ChangelogState string
const (
	AllEventsUpToLockProcessed ChangelogState = "AllEventsUpToLockProcessed"
	GhostTableMigrated         ChangelogState = "GhostTableMigrated"
	Migrated                   ChangelogState = "Migrated"
	ReadMigrationRangeValues   ChangelogState = "ReadMigrationRangeValues"
)

func ReadChangelogState added in v1.0.30

func ReadChangelogState(s string) ChangelogState

type Checkpoint

type Checkpoint struct {
	Id        int64
	Timestamp time.Time
	// LastTrxCoords are coordinates of a transaction
	// that has been applied on ghost table.
	LastTrxCoords mysql.BinlogCoordinates
	// IterationRangeMin is the min shared key value
	// for the chunk copier range.
	IterationRangeMin *sql.ColumnValues
	// IterationRangeMax is the max shared key value
	// for the chunk copier range.
	IterationRangeMax *sql.ColumnValues
	Iteration         int64
	RowsCopied        int64
	DMLApplied        int64
	IsCutover         bool
}

Checkpoint holds state necessary to resume a migration.

type EventsStreamer

type EventsStreamer struct {
	// contains filtered or unexported fields
}

EventsStreamer reads data from binary logs and streams it on. It acts as a publisher, and interested parties may subscribe for per-table events.

func NewEventsStreamer

func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer

func (*EventsStreamer) AddListener

func (this *EventsStreamer) AddListener(
	async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogEntry) error) (err error)

AddListener registers a new listener for binlog events, on a per-table basis

func (*EventsStreamer) Close added in v1.0.28

func (this *EventsStreamer) Close() (err error)

func (*EventsStreamer) GetCurrentBinlogCoordinates added in v0.7.16

func (this *EventsStreamer) GetCurrentBinlogCoordinates() mysql.BinlogCoordinates

func (*EventsStreamer) InitDBConnections

func (this *EventsStreamer) InitDBConnections() (err error)

func (*EventsStreamer) StreamEvents

func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error

StreamEvents will begin streaming events. It will be blocking, so should be executed by a goroutine

func (*EventsStreamer) Teardown added in v1.0.44

func (this *EventsStreamer) Teardown()

type HooksExecutor added in v1.0.17

type HooksExecutor struct {
	// contains filtered or unexported fields
}

func NewHooksExecutor added in v1.0.17

func NewHooksExecutor(migrationContext *base.MigrationContext) *HooksExecutor

type Inspector

type Inspector struct {
	// contains filtered or unexported fields
}

Inspector reads data from the read-MySQL-server (typically a replica, but can be the master) It is used for gaining initial status and structure, and later also follow up on progress and changelog

func NewInspector

func NewInspector(migrationContext *base.MigrationContext) *Inspector

func (*Inspector) CountTableRows added in v0.8.3

func (this *Inspector) CountTableRows(ctx context.Context) error

CountTableRows counts exact number of rows on the original table

func (*Inspector) InitDBConnections

func (this *Inspector) InitDBConnections() (err error)

func (*Inspector) InspectOriginalTable

func (this *Inspector) InspectOriginalTable() (err error)

func (*Inspector) InspectTableColumnsAndUniqueKeys

func (this *Inspector) InspectTableColumnsAndUniqueKeys(tableName string) (columns *sql.ColumnList, virtualColumns *sql.ColumnList, uniqueKeys [](*sql.UniqueKey), err error)

func (*Inspector) Teardown added in v1.0.44

func (this *Inspector) Teardown()

func (*Inspector) ValidateOriginalTable

func (this *Inspector) ValidateOriginalTable() (err error)

type Migrator

type Migrator struct {
	// contains filtered or unexported fields
}

Migrator is the main schema migration flow manager.

func NewMigrator

func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator

func (*Migrator) Checkpoint

func (this *Migrator) Checkpoint(ctx context.Context) (*Checkpoint, error)

Checkpoint attempts to write a checkpoint of the Migrator's current state. It gets the binlog coordinates of the last received trx and waits until the applier reaches that trx. At that point it's safe to resume from these coordinates.

func (*Migrator) CheckpointAfterCutOver

func (this *Migrator) CheckpointAfterCutOver() (*Checkpoint, error)

CheckpointAfterCutOver writes a final checkpoint after the cutover completes successfully.

func (*Migrator) ExecOnFailureHook added in v1.0.17

func (this *Migrator) ExecOnFailureHook() (err error)

ExecOnFailureHook executes the onFailure hook, and this method is provided as the only external hook access point

func (*Migrator) Migrate

func (this *Migrator) Migrate() (err error)

Migrate executes the complete migration logic. This is *the* major gh-ost function.

func (*Migrator) Revert

func (this *Migrator) Revert() error

Revert reverts a migration that previously completed by applying all DML events that happened after the original cutover, then doing another cutover to swap the tables back. The steps are similar to Migrate(), but without row copying.

type PrintStatusRule added in v0.9.6

type PrintStatusRule int

type Server added in v0.8.4

type Server struct {
	// contains filtered or unexported fields
}

Server listens for requests on a socket file or via TCP

func NewServer added in v0.8.4

func NewServer(migrationContext *base.MigrationContext, hooksExecutor *HooksExecutor, printStatus printStatusFunc) *Server

func (*Server) BindSocketFile added in v0.8.4

func (this *Server) BindSocketFile() (err error)

func (*Server) BindTCPPort added in v0.8.4

func (this *Server) BindTCPPort() (err error)

func (*Server) RemoveSocketFile added in v1.0.9

func (this *Server) RemoveSocketFile() (err error)

func (*Server) Serve added in v0.8.4

func (this *Server) Serve() (err error)

Serve begins listening & serving on whichever device was configured

type Throttler added in v1.0.17

type Throttler struct {
	// contains filtered or unexported fields
}

Throttler collects metrics related to throttling and makes informed decision whether throttling should take place.

func NewThrottler added in v1.0.17

func NewThrottler(migrationContext *base.MigrationContext, applier *Applier, inspector *Inspector, appVersion string) *Throttler

func (*Throttler) Teardown added in v1.0.44

func (this *Throttler) Teardown()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL