checkpoint

package
v1.1.0-beta.0...-17a245e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2026 License: Apache-2.0, Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CheckpointBackupDir = CheckpointDir + "/backup"

	CheckpointDataDirForBackup     = CheckpointBackupDir + "/data"
	CheckpointChecksumDirForBackup = CheckpointBackupDir + "/checksum"
	CheckpointMetaPathForBackup    = CheckpointBackupDir + "/checkpoint.meta"
	CheckpointLockPathForBackup    = CheckpointBackupDir + "/checkpoint.lock"
)
View Source
const (
	CheckpointRestoreDirFormat                = CheckpointDir + "/restore-%s"
	CheckpointDataDirForRestoreFormat         = CheckpointRestoreDirFormat + "/data"
	CheckpointChecksumDirForRestoreFormat     = CheckpointRestoreDirFormat + "/checksum"
	CheckpointMetaPathForRestoreFormat        = CheckpointRestoreDirFormat + "/checkpoint.meta"
	CheckpointProgressPathForRestoreFormat    = CheckpointRestoreDirFormat + "/progress.meta"
	CheckpointIngestIndexPathForRestoreFormat = CheckpointRestoreDirFormat + "/ingest_index.meta"
)
View Source
const (
	LogRestoreCheckpointDatabaseName       string = "__TiDB_BR_Temporary_Log_Restore_Checkpoint"
	SnapshotRestoreCheckpointDatabaseName  string = "__TiDB_BR_Temporary_Snapshot_Restore_Checkpoint"
	CustomSSTRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Custom_SST_Restore_Checkpoint"
)

Notice that: 1. the checkpoint table only records one task checkpoint. 2. BR regards the metadata table as a file so that it is not empty if the table exists. 3. BR regards the checkpoint table as a directory which is managed by metadata table.

View Source
const CheckpointDir = "checkpoints"
View Source
const CheckpointIdMapBlockSize int = 524288
View Source
const MaxChecksumTotalCost float64 = 60.0

Variables

This section is empty.

Functions

func AppendForBackup

func AppendForBackup(
	ctx context.Context,
	r *CheckpointRunner[BackupKeyType, BackupValueType],
	startKey []byte,
	endKey []byte,
	files []*backuppb.File,
) error

func AppendRangeForLogRestore

func AppendRangeForLogRestore(
	ctx context.Context,
	r *CheckpointRunner[LogRestoreKeyType, LogRestoreValueType],
	groupKey LogRestoreKeyType,
	tableID int64,
	goff int,
	foff int,
) error

func DefaultTickDurationConfig

func DefaultTickDurationConfig() tickDurationConfig

func IsCheckpointDB

func IsCheckpointDB(dbname string) bool

IsCheckpointDB checks whether the dbname is checkpoint database.

func RemoveCheckpointDataForBackup

func RemoveCheckpointDataForBackup(ctx context.Context, s storage.ExternalStorage) error

func SaveCheckpointMetadata

func SaveCheckpointMetadata(ctx context.Context, s storage.ExternalStorage, meta *CheckpointMetadataForBackup) error

save the checkpoint metadata into the external storage

func WalkCheckpointFileForBackup

func WalkCheckpointFileForBackup(
	ctx context.Context,
	s storage.ExternalStorage,
	cipher *backuppb.CipherInfo,
	fn func(BackupKeyType, BackupValueType) error,
) (time.Duration, error)

walk the whole checkpoint range files and retrieve the metadata of backed up ranges and return the total time cost in the past executions

Types

type BackupKeyType

type BackupKeyType = string

type BackupValueType

type BackupValueType = RangeType

type CheckpointData

type CheckpointData struct {
	DureTime        time.Duration     `json:"dure-time"`
	RangeGroupMetas []*RangeGroupData `json:"range-group-metas"`
}

type CheckpointForeignKeyUpdateSQL

type CheckpointForeignKeyUpdateSQL struct {
	FKID       int64  `json:"fk-id"`
	SchemaName string `json:"schema-name"`
	TableName  string `json:"table-name"`
	FKName     string `json:"fk-name"`
	AddSQL     string `json:"add-sql"`
	AddArgs    []any  `json:"add-args"`

	OldForeignKeyFound bool `json:"-"`
	ForeignKeyUpdated  bool `json:"-"`
}

type CheckpointIngestIndexRepairSQL

type CheckpointIngestIndexRepairSQL struct {
	IndexID    int64     `json:"index-id"`
	SchemaName ast.CIStr `json:"schema-name"`
	TableName  ast.CIStr `json:"table-name"`
	IndexName  string    `json:"index-name"`
	AddSQL     string    `json:"add-sql"`
	AddArgs    []any     `json:"add-args"`

	OldIndexIDFound bool `json:"-"`
	IndexRepaired   bool `json:"-"`
}

type CheckpointIngestIndexRepairSQLs

type CheckpointIngestIndexRepairSQLs struct {
	SQLs   []CheckpointIngestIndexRepairSQL
	FKSQLs []CheckpointForeignKeyUpdateSQL
}

type CheckpointItem

type CheckpointItem struct {
	// contains filtered or unexported fields
}

func NewCheckpointFileItem

func NewCheckpointFileItem(tableID RestoreKeyType, fileName string) *CheckpointItem

func NewCheckpointRangeKeyItem

func NewCheckpointRangeKeyItem(tableID RestoreKeyType, rangeKey string) *CheckpointItem

type CheckpointLock

type CheckpointLock struct {
	LockId   uint64 `json:"lock-id"`
	ExpireAt int64  `json:"expire-at"`
}

type CheckpointMessage

type CheckpointMessage[K KeyType, V ValueType] struct {
	// start-key of the origin range
	GroupKey K

	Group []V
}

type CheckpointMetadataForBackup

type CheckpointMetadataForBackup struct {
	GCServiceId string `json:"gc-service-id"`
	ConfigHash  []byte `json:"config-hash"`
	BackupTS    uint64 `json:"backup-ts"`

	CheckpointChecksum    map[int64]*ChecksumItem `json:"-"`
	LoadCheckpointDataMap bool                    `json:"-"`
}

func LoadCheckpointMetadata

func LoadCheckpointMetadata(ctx context.Context, s storage.ExternalStorage) (*CheckpointMetadataForBackup, error)

load checkpoint metadata from the external storage

type CheckpointMetadataForLogRestore

type CheckpointMetadataForLogRestore struct {
	UpstreamClusterID uint64 `json:"upstream-cluster-id"`
	RestoredTS        uint64 `json:"restored-ts"`
	StartTS           uint64 `json:"start-ts"`
	RewriteTS         uint64 `json:"rewrite-ts"`
	GcRatio           string `json:"gc-ratio"`
	// tiflash recorder items with snapshot restore records
	TiFlashItems map[int64]model.TiFlashReplicaInfo `json:"tiflash-recorder,omitempty"`
}

type CheckpointMetadataForSnapshotRestore

type CheckpointMetadataForSnapshotRestore struct {
	UpstreamClusterID uint64                `json:"upstream-cluster-id"`
	RestoredTS        uint64                `json:"restored-ts"`
	LogRestoredTS     uint64                `json:"log-restored-ts"`
	SchedulersConfig  *pdutil.ClusterConfig `json:"schedulers-config"`
	Hash              []byte                `json:"hash"`
	PreallocIDs       *PreallocIDs          `json:"prealloc-ids"`

	RestoreUUID uuid.UUID `json:"restore-uuid"`
}

type CheckpointProgress

type CheckpointProgress struct {
	Progress RestoreProgress `json:"progress"`
}

type CheckpointRunner

type CheckpointRunner[K KeyType, V ValueType] struct {
	// contains filtered or unexported fields
}

func StartCheckpointBackupRunnerForTest

func StartCheckpointBackupRunnerForTest(
	ctx context.Context,
	storage storage.ExternalStorage,
	cipher *backuppb.CipherInfo,
	tick time.Duration,
	timer GlobalTimer,
) (*CheckpointRunner[BackupKeyType, BackupValueType], error)

only for test

func StartCheckpointLogRestoreRunnerForTest

func StartCheckpointLogRestoreRunnerForTest(
	ctx context.Context,
	tick time.Duration,
	manager LogMetaManagerT,
) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error)

only for test

func StartCheckpointRestoreRunnerForTest

func StartCheckpointRestoreRunnerForTest(
	ctx context.Context,
	tick time.Duration,
	retryDuration time.Duration,
	manager SnapshotMetaManagerT,
) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error)

only for test

func StartCheckpointRunnerForLogRestore

func StartCheckpointRunnerForLogRestore(
	ctx context.Context,
	manager LogMetaManagerT,
) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error)

Notice that the session is owned by the checkpoint runner, and it will be also closed by it.

func StartCheckpointRunnerForRestore

func StartCheckpointRunnerForRestore(
	ctx context.Context,
	manager SnapshotMetaManagerT,
) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error)

Notice that the session is owned by the checkpoint runner, and it will be also closed by it.

func (*CheckpointRunner[K, V]) Append

func (r *CheckpointRunner[K, V]) Append(
	ctx context.Context,
	message *CheckpointMessage[K, V],
) error

func (*CheckpointRunner[K, V]) FlushChecksum

func (r *CheckpointRunner[K, V]) FlushChecksum(
	ctx context.Context,
	tableID int64,
	crc64xor uint64,
	totalKvs uint64,
	totalBytes uint64,
) error

func (*CheckpointRunner[K, V]) FlushChecksumItem

func (r *CheckpointRunner[K, V]) FlushChecksumItem(
	ctx context.Context,
	checksumItem *ChecksumItem,
) error

func (*CheckpointRunner[K, V]) WaitForFinish

func (r *CheckpointRunner[K, V]) WaitForFinish(ctx context.Context, flush bool)

Note: Cannot be parallel with `Append` function

type ChecksumInfo

type ChecksumInfo struct {
	Content  []byte        `json:"content"`
	Checksum []byte        `json:"checksum"`
	DureTime time.Duration `json:"dure-time"`
}

type ChecksumItem

type ChecksumItem struct {
	TableID    int64  `json:"table-id"`
	Crc64xor   uint64 `json:"crc64-xor"`
	TotalKvs   uint64 `json:"total-kvs"`
	TotalBytes uint64 `json:"total-bytes"`
}

type ChecksumItems

type ChecksumItems struct {
	Items []*ChecksumItem `json:"checksum-items"`
}

type GlobalTimer

type GlobalTimer interface {
	GetTS(context.Context) (int64, int64, error)
}

type KeyType

type KeyType interface {
	~BackupKeyType | ~RestoreKeyType
}

type LogMetaManager

type LogMetaManager[K KeyType, SV, LV ValueType, M any] interface {
	MetaManager[K, SV, LV, M]

	LoadCheckpointProgress(context.Context) (*CheckpointProgress, error)
	SaveCheckpointProgress(context.Context, *CheckpointProgress) error
	ExistsCheckpointProgress(context.Context) (bool, error)

	LoadCheckpointIngestIndexRepairSQLs(context.Context) (*CheckpointIngestIndexRepairSQLs, error)
	SaveCheckpointIngestIndexRepairSQLs(context.Context, *CheckpointIngestIndexRepairSQLs) error
	ExistsCheckpointIngestIndexRepairSQLs(context.Context) (bool, error)

	TryGetStorage() storage.ExternalStorage
}

type LogMetaManagerT

func NewLogStorageMetaManager

func NewLogStorageMetaManager(
	storage storage.ExternalStorage,
	cipher *backuppb.CipherInfo,
	clusterID uint64,
	prefix string,
	restoreID uint64,
) LogMetaManagerT

func NewLogTableMetaManager

func NewLogTableMetaManager(
	g glue.Glue,
	dom *domain.Domain,
	dbName string,
	restoreID uint64,
) (LogMetaManagerT, error)

type LogRestoreKeyType

type LogRestoreKeyType = string

type LogRestoreValueMarshaled

type LogRestoreValueMarshaled struct {
	// group index in the metadata
	Goff int `json:"goff"`
	// downstream table id -> file indexes in the group
	Foffs map[int64][]int `json:"foffs"`
}

type LogRestoreValueType

type LogRestoreValueType struct {
	// downstream table id
	TableID int64
	// group index in the metadata
	Goff int
	// file index in the group
	Foff int
}

type MetaManager

type MetaManager[K KeyType, SV, LV ValueType, M any] interface {
	fmt.Stringer

	LoadCheckpointData(context.Context, func(K, LV) error) (time.Duration, error)
	LoadCheckpointChecksum(context.Context) (map[int64]*ChecksumItem, time.Duration, error)
	LoadCheckpointMetadata(context.Context) (*M, error)
	SaveCheckpointMetadata(context.Context, *M) error
	ExistsCheckpointMetadata(context.Context) (bool, error)
	RemoveCheckpointData(context.Context) error

	// start checkpoint runner
	StartCheckpointRunner(
		context.Context, tickDurationConfig, func(*RangeGroup[K, SV]) ([]byte, error),
	) (*CheckpointRunner[K, SV], error)

	// close session
	Close()
}

type PreallocIDs

type PreallocIDs struct {
	Start          int64
	ReusableBorder int64
	End            int64
	Hash           [32]byte
}

type RangeGroup

type RangeGroup[K KeyType, V ValueType] struct {
	GroupKey K   `json:"group-key,omitempty"`
	Group    []V `json:"groups"`
}

type RangeGroupData

type RangeGroupData struct {
	RangeGroupsEncriptedData []byte
	Checksum                 []byte
	CipherIv                 []byte

	Size int
}

type RangeType

type RangeType struct {
	*rtree.Range
}

type RestoreKeyType

type RestoreKeyType = int64

type RestoreProgress

type RestoreProgress int

RestoreProgress is a progress type for snapshot + log restore.

Before the id-maps is persisted into external storage, the snapshot restore and id-maps building can be retried. So if the progress is in `InSnapshotRestore`, it can retry from snapshot restore.

After the id-maps is persisted into external storage, there are some meta-kvs has been restored into the cluster, such as `rename ddl`. A situation could be:

the first execution:

table A created in snapshot restore is renamed to table B in log restore
     table A (id 80)       -------------->        table B (id 80)
  ( snapshot restore )                            ( log restore )

the second execution if don't skip snapshot restore:

table A is created again in snapshot restore, because there is no table named A
     table A (id 81)       -------------->   [not in id-maps, so ignored]
  ( snapshot restore )                            ( log restore )

Finally, there is a duplicated table A in the cluster. Therefore, need to skip snapshot restore when the progress is `InLogRestoreAndIdMapPersist`.

const (
	InSnapshotRestore RestoreProgress = iota
	// Only when the id-maps is persisted, status turns into it.
	InLogRestoreAndIdMapPersisted
)

type RestoreValueType

type RestoreValueType struct {
	// the file key of a range
	RangeKey string `json:"range-key,omitempty"`
	// the file name, used for compacted restore
	Name string `json:"name,omitempty"`
}

type SnapshotMetaManagerT

func NewSnapshotStorageMetaManager

func NewSnapshotStorageMetaManager(
	storage storage.ExternalStorage,
	cipher *backuppb.CipherInfo,
	clusterID uint64,
	prefix string,
	restoreID uint64,
) SnapshotMetaManagerT

func NewSnapshotTableMetaManager

func NewSnapshotTableMetaManager(
	g glue.Glue,
	dom *domain.Domain,
	dbName string,
	restoreID uint64,
) (SnapshotMetaManagerT, error)

type StorageMetaManager

type StorageMetaManager[K KeyType, SV, LV ValueType, M any] struct {
	// contains filtered or unexported fields
}

func (*StorageMetaManager[K, SV, LV, M]) Close

func (manager *StorageMetaManager[K, SV, LV, M]) Close()

func (*StorageMetaManager[K, SV, LV, M]) ExistsCheckpointIngestIndexRepairSQLs

func (manager *StorageMetaManager[K, SV, LV, M]) ExistsCheckpointIngestIndexRepairSQLs(
	ctx context.Context,
) (bool, error)

func (*StorageMetaManager[K, SV, LV, M]) ExistsCheckpointMetadata

func (manager *StorageMetaManager[K, SV, LV, M]) ExistsCheckpointMetadata(
	ctx context.Context,
) (bool, error)

func (*StorageMetaManager[K, SV, LV, M]) ExistsCheckpointProgress

func (manager *StorageMetaManager[K, SV, LV, M]) ExistsCheckpointProgress(
	ctx context.Context,
) (bool, error)

func (*StorageMetaManager[K, SV, LV, M]) LoadCheckpointChecksum

func (manager *StorageMetaManager[K, SV, LV, M]) LoadCheckpointChecksum(
	ctx context.Context,
) (map[int64]*ChecksumItem, time.Duration, error)

func (*StorageMetaManager[K, SV, LV, M]) LoadCheckpointData

func (manager *StorageMetaManager[K, SV, LV, M]) LoadCheckpointData(
	ctx context.Context,
	fn func(K, LV) error,
) (time.Duration, error)

func (*StorageMetaManager[K, SV, LV, M]) LoadCheckpointIngestIndexRepairSQLs

func (manager *StorageMetaManager[K, SV, LV, M]) LoadCheckpointIngestIndexRepairSQLs(
	ctx context.Context,
) (*CheckpointIngestIndexRepairSQLs, error)

func (*StorageMetaManager[K, SV, LV, M]) LoadCheckpointMetadata

func (manager *StorageMetaManager[K, SV, LV, M]) LoadCheckpointMetadata(
	ctx context.Context,
) (*M, error)

func (*StorageMetaManager[K, SV, LV, M]) LoadCheckpointProgress

func (manager *StorageMetaManager[K, SV, LV, M]) LoadCheckpointProgress(
	ctx context.Context,
) (*CheckpointProgress, error)

func (*StorageMetaManager[K, SV, LV, M]) RemoveCheckpointData

func (manager *StorageMetaManager[K, SV, LV, M]) RemoveCheckpointData(
	ctx context.Context,
) error

func (*StorageMetaManager[K, SV, LV, M]) SaveCheckpointIngestIndexRepairSQLs

func (manager *StorageMetaManager[K, SV, LV, M]) SaveCheckpointIngestIndexRepairSQLs(
	ctx context.Context,
	meta *CheckpointIngestIndexRepairSQLs,
) error

func (*StorageMetaManager[K, SV, LV, M]) SaveCheckpointMetadata

func (manager *StorageMetaManager[K, SV, LV, M]) SaveCheckpointMetadata(
	ctx context.Context,
	meta *M,
) error

func (*StorageMetaManager[K, SV, LV, M]) SaveCheckpointProgress

func (manager *StorageMetaManager[K, SV, LV, M]) SaveCheckpointProgress(
	ctx context.Context,
	meta *CheckpointProgress,
) error

func (*StorageMetaManager[K, SV, LV, M]) StartCheckpointRunner

func (manager *StorageMetaManager[K, SV, LV, M]) StartCheckpointRunner(
	ctx context.Context,
	cfg tickDurationConfig,
	valueMarshaler func(*RangeGroup[K, SV]) ([]byte, error),
) (*CheckpointRunner[K, SV], error)

func (*StorageMetaManager[K, SV, LV, M]) String

func (manager *StorageMetaManager[K, SV, LV, M]) String() string

func (*StorageMetaManager[K, SV, LV, M]) TryGetStorage

func (manager *StorageMetaManager[K, SV, LV, M]) TryGetStorage() storage.ExternalStorage

type TableMetaManager

type TableMetaManager[K KeyType, SV, LV ValueType, M any] struct {
	// contains filtered or unexported fields
}

func (*TableMetaManager[K, SV, LV, M]) Close

func (manager *TableMetaManager[K, SV, LV, M]) Close()

func (*TableMetaManager[K, SV, LV, M]) ExistsCheckpointIngestIndexRepairSQLs

func (manager *TableMetaManager[K, SV, LV, M]) ExistsCheckpointIngestIndexRepairSQLs(
	ctx context.Context,
) (bool, error)

func (*TableMetaManager[K, SV, LV, M]) ExistsCheckpointMetadata

func (manager *TableMetaManager[K, SV, LV, M]) ExistsCheckpointMetadata(
	ctx context.Context,
) (bool, error)

func (*TableMetaManager[K, SV, LV, M]) ExistsCheckpointProgress

func (manager *TableMetaManager[K, SV, LV, M]) ExistsCheckpointProgress(
	ctx context.Context,
) (bool, error)

func (*TableMetaManager[K, SV, LV, M]) LoadCheckpointChecksum

func (manager *TableMetaManager[K, SV, LV, M]) LoadCheckpointChecksum(
	ctx context.Context,
) (map[int64]*ChecksumItem, time.Duration, error)

func (*TableMetaManager[K, SV, LV, M]) LoadCheckpointData

func (manager *TableMetaManager[K, SV, LV, M]) LoadCheckpointData(
	ctx context.Context,
	fn func(K, LV) error,
) (time.Duration, error)

LoadCheckpointData loads the whole checkpoint range data and retrieve the metadata of restored ranges and return the total time cost in the past executions

func (*TableMetaManager[K, SV, LV, M]) LoadCheckpointIngestIndexRepairSQLs

func (manager *TableMetaManager[K, SV, LV, M]) LoadCheckpointIngestIndexRepairSQLs(
	ctx context.Context,
) (*CheckpointIngestIndexRepairSQLs, error)

func (*TableMetaManager[K, SV, LV, M]) LoadCheckpointMetadata

func (manager *TableMetaManager[K, SV, LV, M]) LoadCheckpointMetadata(
	ctx context.Context,
) (*M, error)

func (*TableMetaManager[K, SV, LV, M]) LoadCheckpointProgress

func (manager *TableMetaManager[K, SV, LV, M]) LoadCheckpointProgress(
	ctx context.Context,
) (*CheckpointProgress, error)

func (*TableMetaManager[K, SV, LV, M]) RemoveCheckpointData

func (manager *TableMetaManager[K, SV, LV, M]) RemoveCheckpointData(
	ctx context.Context,
) error

func (*TableMetaManager[K, SV, LV, M]) SaveCheckpointIngestIndexRepairSQLs

func (manager *TableMetaManager[K, SV, LV, M]) SaveCheckpointIngestIndexRepairSQLs(
	ctx context.Context,
	meta *CheckpointIngestIndexRepairSQLs,
) error

func (*TableMetaManager[K, SV, LV, M]) SaveCheckpointMetadata

func (manager *TableMetaManager[K, SV, LV, M]) SaveCheckpointMetadata(
	ctx context.Context,
	meta *M,
) error

func (*TableMetaManager[K, SV, LV, M]) SaveCheckpointProgress

func (manager *TableMetaManager[K, SV, LV, M]) SaveCheckpointProgress(
	ctx context.Context,
	meta *CheckpointProgress,
) error

func (*TableMetaManager[K, SV, LV, M]) StartCheckpointRunner

func (manager *TableMetaManager[K, SV, LV, M]) StartCheckpointRunner(
	ctx context.Context,
	cfg tickDurationConfig,
	valueMarshaler func(*RangeGroup[K, SV]) ([]byte, error),
) (*CheckpointRunner[K, SV], error)

func (*TableMetaManager[K, SV, LV, M]) String

func (manager *TableMetaManager[K, SV, LV, M]) String() string

func (*TableMetaManager[K, SV, LV, M]) TryGetStorage

func (manager *TableMetaManager[K, SV, LV, M]) TryGetStorage() storage.ExternalStorage

type TaskInfoForLogRestore

type TaskInfoForLogRestore struct {
	Metadata            *CheckpointMetadataForLogRestore
	HasSnapshotMetadata bool
	// the progress for this task
	Progress RestoreProgress
}

TaskInfoForLogRestore is tied to a specific cluster. It represents the last restore task executed in this cluster.

func GetCheckpointTaskInfo

func GetCheckpointTaskInfo(
	ctx context.Context,
	snapshotManager SnapshotMetaManagerT,
	logManager LogMetaManagerT,
) (*TaskInfoForLogRestore, error)

func (*TaskInfoForLogRestore) IdMapSaved

func (t *TaskInfoForLogRestore) IdMapSaved() bool

type TimeTicker

type TimeTicker interface {
	Ch() <-chan time.Time
	Stop()
}

type ValueType

type ValueType any

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL