Documentation
¶
Index ¶
- Constants
- func AppendForBackup(ctx context.Context, r *CheckpointRunner[BackupKeyType, BackupValueType], ...) error
- func AppendRangeForLogRestore(ctx context.Context, ...) error
- func AppendRangesForRestore(ctx context.Context, r *CheckpointRunner[RestoreKeyType, RestoreValueType], ...) error
- func ExistsCheckpointIngestIndexRepairSQLs(ctx context.Context, dom *domain.Domain) bool
- func ExistsCheckpointProgress(ctx context.Context, dom *domain.Domain) bool
- func ExistsLogRestoreCheckpointMetadata(ctx context.Context, dom *domain.Domain) bool
- func ExistsSnapshotRestoreCheckpoint(ctx context.Context, dom *domain.Domain) bool
- func IsCheckpointDB(dbname pmodel.CIStr) bool
- func LoadCheckpointChecksumForRestore(ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor) (map[int64]*ChecksumItem, time.Duration, error)
- func LoadCheckpointDataForLogRestore[K KeyType, V ValueType](ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor, fn func(K, V)) (time.Duration, error)
- func LoadCheckpointDataForSnapshotRestore[K KeyType, V ValueType](ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor, fn func(K, V)) (time.Duration, error)
- func RemoveCheckpointDataForBackup(ctx context.Context, s storage.ExternalStorage) error
- func RemoveCheckpointDataForLogRestore(ctx context.Context, dom *domain.Domain, se glue.Session) error
- func RemoveCheckpointDataForSnapshotRestore(ctx context.Context, dom *domain.Domain, se glue.Session) error
- func SaveCheckpointIngestIndexRepairSQLs(ctx context.Context, se glue.Session, meta *CheckpointIngestIndexRepairSQLs) error
- func SaveCheckpointMetadata(ctx context.Context, s storage.ExternalStorage, ...) error
- func SaveCheckpointMetadataForLogRestore(ctx context.Context, se glue.Session, meta *CheckpointMetadataForLogRestore) error
- func SaveCheckpointMetadataForSnapshotRestore(ctx context.Context, se glue.Session, ...) error
- func SaveCheckpointProgress(ctx context.Context, se glue.Session, meta *CheckpointProgress) error
- func WalkCheckpointFileForBackup(ctx context.Context, s storage.ExternalStorage, cipher *backuppb.CipherInfo, ...) (time.Duration, error)
- type BackupKeyType
- type BackupValueType
- type CheckpointData
- type CheckpointIngestIndexRepairSQL
- type CheckpointIngestIndexRepairSQLs
- type CheckpointLock
- type CheckpointMessage
- type CheckpointMetadataForBackup
- type CheckpointMetadataForLogRestore
- type CheckpointMetadataForSnapshotRestore
- type CheckpointProgress
- type CheckpointRunner
- func StartCheckpointBackupRunnerForTest(ctx context.Context, storage storage.ExternalStorage, ...) (*CheckpointRunner[BackupKeyType, BackupValueType], error)
- func StartCheckpointLogRestoreRunnerForTest(ctx context.Context, se glue.Session, tick time.Duration) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error)
- func StartCheckpointRestoreRunnerForTest(ctx context.Context, se glue.Session, tick time.Duration, ...) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error)
- func StartCheckpointRunnerForBackup(ctx context.Context, storage storage.ExternalStorage, ...) (*CheckpointRunner[BackupKeyType, BackupValueType], error)
- func StartCheckpointRunnerForLogRestore(ctx context.Context, se glue.Session) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error)
- func StartCheckpointRunnerForRestore(ctx context.Context, se glue.Session) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error)
- func (r *CheckpointRunner[K, V]) Append(ctx context.Context, message *CheckpointMessage[K, V]) error
- func (r *CheckpointRunner[K, V]) FlushChecksum(ctx context.Context, tableID int64, crc64xor uint64, totalKvs uint64, ...) error
- func (r *CheckpointRunner[K, V]) FlushChecksumItem(ctx context.Context, checksumItem *ChecksumItem) error
- func (r *CheckpointRunner[K, V]) WaitForFinish(ctx context.Context, flush bool)
- type CheckpointTaskInfoForLogRestore
- type ChecksumInfo
- type ChecksumItem
- type ChecksumItems
- type GlobalTimer
- type KeyType
- type LogRestoreKeyType
- type LogRestoreValueMarshaled
- type LogRestoreValueType
- type RangeGroup
- type RangeGroupData
- type RangeType
- type RestoreKeyType
- type RestoreProgress
- type RestoreValueType
- type TimeTicker
- type ValueType
Constants ¶
const ( CheckpointBackupDir = CheckpointDir + "/backup" CheckpointDataDirForBackup = CheckpointBackupDir + "/data" CheckpointChecksumDirForBackup = CheckpointBackupDir + "/checksum" CheckpointMetaPathForBackup = CheckpointBackupDir + "/checkpoint.meta" CheckpointLockPathForBackup = CheckpointBackupDir + "/checkpoint.lock" )
const ( LogRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Log_Restore_Checkpoint" SnapshotRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Snapshot_Restore_Checkpoint" )
Notice that: 1. the checkpoint table only records one task checkpoint. 2. BR regards the metadata table as a file so that it is not empty if the table exists. 3. BR regards the checkpoint table as a directory which is managed by metadata table.
const CheckpointDir = "checkpoints"
const CheckpointIdMapBlockSize int = 524288
const MaxChecksumTotalCost float64 = 60.0
Variables ¶
This section is empty.
Functions ¶
func AppendForBackup ¶
func AppendForBackup( ctx context.Context, r *CheckpointRunner[BackupKeyType, BackupValueType], groupKey BackupKeyType, startKey []byte, endKey []byte, files []*backuppb.File, ) error
func AppendRangeForLogRestore ¶
func AppendRangeForLogRestore( ctx context.Context, r *CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], groupKey LogRestoreKeyType, tableID int64, goff int, foff int, ) error
func AppendRangesForRestore ¶
func AppendRangesForRestore( ctx context.Context, r *CheckpointRunner[RestoreKeyType, RestoreValueType], tableID RestoreKeyType, rangeKey string, ) error
func IsCheckpointDB ¶
IsCheckpointDB checks whether the dbname is checkpoint database.
func LoadCheckpointChecksumForRestore ¶
func LoadCheckpointChecksumForRestore( ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor, ) (map[int64]*ChecksumItem, time.Duration, error)
func LoadCheckpointDataForLogRestore ¶
func LoadCheckpointDataForLogRestore[K KeyType, V ValueType]( ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor, fn func(K, V), ) (time.Duration, error)
load the whole checkpoint range data and retrieve the metadata of restored ranges and return the total time cost in the past executions
func LoadCheckpointDataForSnapshotRestore ¶
func LoadCheckpointDataForSnapshotRestore[K KeyType, V ValueType]( ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor, fn func(K, V), ) (time.Duration, error)
load the whole checkpoint range data and retrieve the metadata of restored ranges and return the total time cost in the past executions
func RemoveCheckpointDataForBackup ¶
func RemoveCheckpointDataForBackup(ctx context.Context, s storage.ExternalStorage) error
func SaveCheckpointMetadata ¶
func SaveCheckpointMetadata(ctx context.Context, s storage.ExternalStorage, meta *CheckpointMetadataForBackup) error
save the checkpoint metadata into the external storage
func SaveCheckpointProgress ¶
func WalkCheckpointFileForBackup ¶
func WalkCheckpointFileForBackup( ctx context.Context, s storage.ExternalStorage, cipher *backuppb.CipherInfo, fn func(BackupKeyType, BackupValueType), ) (time.Duration, error)
walk the whole checkpoint range files and retrieve the metadata of backed up ranges and return the total time cost in the past executions
Types ¶
type BackupKeyType ¶
type BackupKeyType = string
type BackupValueType ¶
type BackupValueType = RangeType
type CheckpointData ¶
type CheckpointData struct {
DureTime time.Duration `json:"dure-time"`
RangeGroupMetas []*RangeGroupData `json:"range-group-metas"`
}
type CheckpointIngestIndexRepairSQL ¶
type CheckpointIngestIndexRepairSQL struct {
IndexID int64 `json:"index-id"`
SchemaName pmodel.CIStr `json:"schema-name"`
TableName pmodel.CIStr `json:"table-name"`
IndexName string `json:"index-name"`
AddSQL string `json:"add-sql"`
AddArgs []any `json:"add-args"`
OldIndexIDFound bool `json:"-"`
IndexRepaired bool `json:"-"`
}
type CheckpointIngestIndexRepairSQLs ¶
type CheckpointIngestIndexRepairSQLs struct {
SQLs []CheckpointIngestIndexRepairSQL
}
func LoadCheckpointIngestIndexRepairSQLs ¶
func LoadCheckpointIngestIndexRepairSQLs( ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor, ) (*CheckpointIngestIndexRepairSQLs, error)
type CheckpointLock ¶
type CheckpointMessage ¶
type CheckpointMetadataForBackup ¶
type CheckpointMetadataForBackup struct {
GCServiceId string `json:"gc-service-id"`
ConfigHash []byte `json:"config-hash"`
BackupTS uint64 `json:"backup-ts"`
Ranges []rtree.Range `json:"ranges"`
CheckpointChecksum map[int64]*ChecksumItem `json:"-"`
CheckpointDataMap map[string]rtree.RangeTree `json:"-"`
}
func LoadCheckpointMetadata ¶
func LoadCheckpointMetadata(ctx context.Context, s storage.ExternalStorage) (*CheckpointMetadataForBackup, error)
load checkpoint metadata from the external storage
type CheckpointMetadataForLogRestore ¶
type CheckpointMetadataForLogRestore struct {
UpstreamClusterID uint64 `json:"upstream-cluster-id"`
RestoredTS uint64 `json:"restored-ts"`
StartTS uint64 `json:"start-ts"`
RewriteTS uint64 `json:"rewrite-ts"`
GcRatio string `json:"gc-ratio"`
// tiflash recorder items with snapshot restore records
TiFlashItems map[int64]model.TiFlashReplicaInfo `json:"tiflash-recorder,omitempty"`
}
func LoadCheckpointMetadataForLogRestore ¶
func LoadCheckpointMetadataForLogRestore( ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor, ) (*CheckpointMetadataForLogRestore, error)
type CheckpointMetadataForSnapshotRestore ¶
type CheckpointMetadataForSnapshotRestore struct {
UpstreamClusterID uint64 `json:"upstream-cluster-id"`
RestoredTS uint64 `json:"restored-ts"`
SchedulersConfig *pdutil.ClusterConfig `json:"schedulers-config"`
}
func LoadCheckpointMetadataForSnapshotRestore ¶
func LoadCheckpointMetadataForSnapshotRestore( ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor, ) (*CheckpointMetadataForSnapshotRestore, error)
type CheckpointProgress ¶
type CheckpointProgress struct {
Progress RestoreProgress `json:"progress"`
}
func LoadCheckpointProgress ¶
func LoadCheckpointProgress( ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor, ) (*CheckpointProgress, error)
type CheckpointRunner ¶
func StartCheckpointBackupRunnerForTest ¶
func StartCheckpointBackupRunnerForTest( ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo, tick time.Duration, timer GlobalTimer, ) (*CheckpointRunner[BackupKeyType, BackupValueType], error)
only for test
func StartCheckpointLogRestoreRunnerForTest ¶
func StartCheckpointLogRestoreRunnerForTest( ctx context.Context, se glue.Session, tick time.Duration, ) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error)
only for test
func StartCheckpointRestoreRunnerForTest ¶
func StartCheckpointRestoreRunnerForTest( ctx context.Context, se glue.Session, tick time.Duration, retryDuration time.Duration, ) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error)
only for test
func StartCheckpointRunnerForBackup ¶
func StartCheckpointRunnerForBackup( ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo, timer GlobalTimer, ) (*CheckpointRunner[BackupKeyType, BackupValueType], error)
func StartCheckpointRunnerForLogRestore ¶
func StartCheckpointRunnerForLogRestore( ctx context.Context, se glue.Session, ) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error)
Notice that the session is owned by the checkpoint runner, and it will be also closed by it.
func StartCheckpointRunnerForRestore ¶
func StartCheckpointRunnerForRestore( ctx context.Context, se glue.Session, ) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error)
Notice that the session is owned by the checkpoint runner, and it will be also closed by it.
func (*CheckpointRunner[K, V]) Append ¶
func (r *CheckpointRunner[K, V]) Append( ctx context.Context, message *CheckpointMessage[K, V], ) error
func (*CheckpointRunner[K, V]) FlushChecksum ¶
func (*CheckpointRunner[K, V]) FlushChecksumItem ¶
func (r *CheckpointRunner[K, V]) FlushChecksumItem( ctx context.Context, checksumItem *ChecksumItem, ) error
func (*CheckpointRunner[K, V]) WaitForFinish ¶
func (r *CheckpointRunner[K, V]) WaitForFinish(ctx context.Context, flush bool)
Note: Cannot be parallel with `Append` function
type CheckpointTaskInfoForLogRestore ¶
type CheckpointTaskInfoForLogRestore struct {
Metadata *CheckpointMetadataForLogRestore
HasSnapshotMetadata bool
// the progress for this task
Progress RestoreProgress
}
CheckpointTaskInfo is unique information within the same cluster id. It represents the last restore task executed for this cluster.
func TryToGetCheckpointTaskInfo ¶
func TryToGetCheckpointTaskInfo( ctx context.Context, dom *domain.Domain, execCtx sqlexec.RestrictedSQLExecutor, ) (*CheckpointTaskInfoForLogRestore, error)
type ChecksumInfo ¶
type ChecksumItem ¶
type ChecksumItems ¶
type ChecksumItems struct {
Items []*ChecksumItem `json:"checksum-items"`
}
type KeyType ¶
type KeyType interface {
~BackupKeyType | ~RestoreKeyType
}
type LogRestoreKeyType ¶
type LogRestoreKeyType = string
type LogRestoreValueMarshaled ¶
type LogRestoreValueMarshaled struct {
// group index in the metadata
Goff int `json:"goff"`
// downstream table id -> file indexes in the group
Foffs map[int64][]int `json:"foffs"`
}
func (LogRestoreValueMarshaled) IdentKey ¶
func (l LogRestoreValueMarshaled) IdentKey() []byte
type LogRestoreValueType ¶
type LogRestoreValueType struct {
// downstream table id
TableID int64
// group index in the metadata
Goff int
// file index in the group
Foff int
}
func (LogRestoreValueType) IdentKey ¶
func (l LogRestoreValueType) IdentKey() []byte
type RangeGroup ¶
type RangeGroupData ¶
type RestoreKeyType ¶
type RestoreKeyType = int64
type RestoreProgress ¶
type RestoreProgress int
A progress type for snapshot + log restore.
Before the id-maps is persist into external storage, the snapshot restore and id-maps constructure can be repeated. So if the progress is in `InSnapshotRestore`, it can retry from snapshot restore.
After the id-maps is persist into external storage, there are some meta-kvs has been restored into the cluster, such as `rename ddl`. Where would be a situation:
the first execution:
table A created in snapshot restore is renamed to table B in log restore
table A (id 80) --------------> table B (id 80)
( snapshot restore ) ( log restore )
the second execution if don't skip snasphot restore:
table A is created again in snapshot restore, because there is no table named A
table A (id 81) --------------> [not in id-maps, so ignored]
( snapshot restore ) ( log restore )
Finally, there is a duplicated table A in the cluster. Therefore, need to skip snapshot restore when the progress is `InLogRestoreAndIdMapPersist`.
const ( InSnapshotRestore RestoreProgress = iota // Only when the id-maps is persist, status turns into it. InLogRestoreAndIdMapPersist )
type RestoreValueType ¶
type RestoreValueType struct {
// the file key of a range
RangeKey string
}
func (RestoreValueType) IdentKey ¶
func (rv RestoreValueType) IdentKey() []byte