Documentation
¶
Index ¶
- Constants
- Variables
- func IsCheckpointTable(name string) bool
- func IsCheckpointsDBExists(ctx context.Context, cfg *config.Config) (bool, error)
- func Transact(ctx context.Context, purpose string, s Session, logger log.Logger, ...) error
- type CheckpointStatus
- type CheckpointsDB
- type CheckpointsModel
- func (*CheckpointsModel) Descriptor() ([]byte, []int)
- func (m *CheckpointsModel) Marshal() (dAtA []byte, err error)
- func (m *CheckpointsModel) MarshalTo(dAtA []byte) (int, error)
- func (m *CheckpointsModel) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*CheckpointsModel) ProtoMessage()
- func (m *CheckpointsModel) Reset()
- func (m *CheckpointsModel) Size() (n int)
- func (m *CheckpointsModel) String() string
- func (m *CheckpointsModel) Unmarshal(dAtA []byte) error
- func (m *CheckpointsModel) XXX_DiscardUnknown()
- func (m *CheckpointsModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *CheckpointsModel) XXX_Merge(src proto.Message)
- func (m *CheckpointsModel) XXX_Size() int
- func (m *CheckpointsModel) XXX_Unmarshal(b []byte) error
- type ChunkCheckpoint
- type ChunkCheckpointKey
- type ChunkCheckpointMerger
- type ChunkCheckpointModel
- func (*ChunkCheckpointModel) Descriptor() ([]byte, []int)
- func (m *ChunkCheckpointModel) Marshal() (dAtA []byte, err error)
- func (m *ChunkCheckpointModel) MarshalTo(dAtA []byte) (int, error)
- func (m *ChunkCheckpointModel) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*ChunkCheckpointModel) ProtoMessage()
- func (m *ChunkCheckpointModel) Reset()
- func (m *ChunkCheckpointModel) Size() (n int)
- func (m *ChunkCheckpointModel) String() string
- func (m *ChunkCheckpointModel) Unmarshal(dAtA []byte) error
- func (m *ChunkCheckpointModel) XXX_DiscardUnknown()
- func (m *ChunkCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *ChunkCheckpointModel) XXX_Merge(src proto.Message)
- func (m *ChunkCheckpointModel) XXX_Size() int
- func (m *ChunkCheckpointModel) XXX_Unmarshal(b []byte) error
- type DestroyedTableCheckpoint
- type EngineCheckpoint
- type EngineCheckpointModel
- func (*EngineCheckpointModel) Descriptor() ([]byte, []int)
- func (m *EngineCheckpointModel) Marshal() (dAtA []byte, err error)
- func (m *EngineCheckpointModel) MarshalTo(dAtA []byte) (int, error)
- func (m *EngineCheckpointModel) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*EngineCheckpointModel) ProtoMessage()
- func (m *EngineCheckpointModel) Reset()
- func (m *EngineCheckpointModel) Size() (n int)
- func (m *EngineCheckpointModel) String() string
- func (m *EngineCheckpointModel) Unmarshal(dAtA []byte) error
- func (m *EngineCheckpointModel) XXX_DiscardUnknown()
- func (m *EngineCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *EngineCheckpointModel) XXX_Merge(src proto.Message)
- func (m *EngineCheckpointModel) XXX_Size() int
- func (m *EngineCheckpointModel) XXX_Unmarshal(b []byte) error
- type FileCheckpointsDB
- func (cpdb *FileCheckpointsDB) Close() error
- func (cpdb *FileCheckpointsDB) DestroyErrorCheckpoint(_ context.Context, targetTableName string) ([]DestroyedTableCheckpoint, error)
- func (cpdb *FileCheckpointsDB) DumpChunks(context.Context, io.Writer) error
- func (cpdb *FileCheckpointsDB) DumpEngines(context.Context, io.Writer) error
- func (cpdb *FileCheckpointsDB) DumpTables(context.Context, io.Writer) error
- func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableCheckpoint, error)
- func (cpdb *FileCheckpointsDB) GetLocalStoringTables(_ context.Context) (map[string][]int32, error)
- func (cpdb *FileCheckpointsDB) IgnoreErrorCheckpoint(_ context.Context, targetTableName string) error
- func (cpdb *FileCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
- func (cpdb *FileCheckpointsDB) InsertEngineCheckpoints(_ context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error
- func (cpdb *FileCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error
- func (cpdb *FileCheckpointsDB) RemoveCheckpoint(_ context.Context, tableName string) error
- func (cpdb *FileCheckpointsDB) TaskCheckpoint(_ context.Context) (*TaskCheckpoint, error)
- func (cpdb *FileCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)
- type GlueCheckpointsDB
- func (g GlueCheckpointsDB) Close() error
- func (g GlueCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)
- func (g GlueCheckpointsDB) DumpChunks(ctx context.Context, csv io.Writer) error
- func (g GlueCheckpointsDB) DumpEngines(ctx context.Context, csv io.Writer) error
- func (g GlueCheckpointsDB) DumpTables(ctx context.Context, csv io.Writer) error
- func (g GlueCheckpointsDB) Get(ctx context.Context, tableName string) (*TableCheckpoint, error)
- func (g GlueCheckpointsDB) GetLocalStoringTables(ctx context.Context) (map[string][]int32, error)
- func (g GlueCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error
- func (g GlueCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
- func (g GlueCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, ...) error
- func (g GlueCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error
- func (g GlueCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error
- func (g GlueCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
- func (g GlueCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)
- type MySQLCheckpointsDB
- func (cpdb *MySQLCheckpointsDB) Close() error
- func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)
- func (cpdb *MySQLCheckpointsDB) DumpChunks(ctx context.Context, writer io.Writer) error
- func (cpdb *MySQLCheckpointsDB) DumpEngines(ctx context.Context, writer io.Writer) error
- func (cpdb *MySQLCheckpointsDB) DumpTables(ctx context.Context, writer io.Writer) error
- func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*TableCheckpoint, error)
- func (cpdb *MySQLCheckpointsDB) GetLocalStoringTables(ctx context.Context) (map[string][]int32, error)
- func (cpdb *MySQLCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error
- func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
- func (cpdb *MySQLCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error
- func (cpdb *MySQLCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error
- func (cpdb *MySQLCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error
- func (cpdb *MySQLCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
- func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)
- type NullCheckpointsDB
- func (*NullCheckpointsDB) Close() error
- func (*NullCheckpointsDB) DestroyErrorCheckpoint(context.Context, string) ([]DestroyedTableCheckpoint, error)
- func (*NullCheckpointsDB) DumpChunks(context.Context, io.Writer) error
- func (*NullCheckpointsDB) DumpEngines(context.Context, io.Writer) error
- func (*NullCheckpointsDB) DumpTables(context.Context, io.Writer) error
- func (*NullCheckpointsDB) Get(_ context.Context, _ string) (*TableCheckpoint, error)
- func (*NullCheckpointsDB) GetLocalStoringTables(context.Context) (map[string][]int32, error)
- func (*NullCheckpointsDB) IgnoreErrorCheckpoint(context.Context, string) error
- func (*NullCheckpointsDB) Initialize(context.Context, *config.Config, map[string]*TidbDBInfo) error
- func (*NullCheckpointsDB) InsertEngineCheckpoints(_ context.Context, _ string, _ map[int32]*EngineCheckpoint) error
- func (*NullCheckpointsDB) MoveCheckpoints(context.Context, int64) error
- func (*NullCheckpointsDB) RemoveCheckpoint(context.Context, string) error
- func (*NullCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
- func (*NullCheckpointsDB) Update(map[string]*TableCheckpointDiff)
- type RebaseCheckpointMerger
- type Session
- type StatusCheckpointMerger
- type TableCheckpoint
- type TableCheckpointDiff
- type TableCheckpointMerger
- type TableCheckpointModel
- func (*TableCheckpointModel) Descriptor() ([]byte, []int)
- func (m *TableCheckpointModel) Marshal() (dAtA []byte, err error)
- func (m *TableCheckpointModel) MarshalTo(dAtA []byte) (int, error)
- func (m *TableCheckpointModel) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*TableCheckpointModel) ProtoMessage()
- func (m *TableCheckpointModel) Reset()
- func (m *TableCheckpointModel) Size() (n int)
- func (m *TableCheckpointModel) String() string
- func (m *TableCheckpointModel) Unmarshal(dAtA []byte) error
- func (m *TableCheckpointModel) XXX_DiscardUnknown()
- func (m *TableCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *TableCheckpointModel) XXX_Merge(src proto.Message)
- func (m *TableCheckpointModel) XXX_Size() int
- func (m *TableCheckpointModel) XXX_Unmarshal(b []byte) error
- type TaskCheckpoint
- type TaskCheckpointModel
- func (*TaskCheckpointModel) Descriptor() ([]byte, []int)
- func (m *TaskCheckpointModel) Marshal() (dAtA []byte, err error)
- func (m *TaskCheckpointModel) MarshalTo(dAtA []byte) (int, error)
- func (m *TaskCheckpointModel) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*TaskCheckpointModel) ProtoMessage()
- func (m *TaskCheckpointModel) Reset()
- func (m *TaskCheckpointModel) Size() (n int)
- func (m *TaskCheckpointModel) String() string
- func (m *TaskCheckpointModel) Unmarshal(dAtA []byte) error
- func (m *TaskCheckpointModel) XXX_DiscardUnknown()
- func (m *TaskCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *TaskCheckpointModel) XXX_Merge(src proto.Message)
- func (m *TaskCheckpointModel) XXX_Size() int
- func (m *TaskCheckpointModel) XXX_Unmarshal(b []byte) error
- type TidbDBInfo
- type TidbTableInfo
Constants ¶
View Source
const ( // the table names to store each kind of checkpoint in the checkpoint database // remember to increase the version number in case of incompatible change. CheckpointTableNameTask = "task_v2" CheckpointTableNameTable = "table_v6" CheckpointTableNameEngine = "engine_v5" CheckpointTableNameChunk = "chunk_v5" )
View Source
const ( // shared by MySQLCheckpointsDB and GlueCheckpointsDB CreateDBTemplate = "CREATE DATABASE IF NOT EXISTS %s;" CreateTaskTableTemplate = `` /* 376-byte string literal not displayed */ CreateTableTableTemplate = `` /* 435-byte string literal not displayed */ CreateEngineTableTemplate = `` /* 338-byte string literal not displayed */ CreateChunkTableTemplate = `` /* 827-byte string literal not displayed */ InitTaskTemplate = `` /* 174-byte string literal not displayed */ InitTableTemplate = `` /* 182-byte string literal not displayed */ ReadTaskTemplate = `` /* 139-byte string literal not displayed */ ReadEngineTemplate = ` SELECT engine_id, status FROM %s.%s WHERE table_name = ? ORDER BY engine_id DESC;` ReadChunkTemplate = `` /* 268-byte string literal not displayed */ ReadTableRemainTemplate = ` SELECT status, alloc_base, table_id FROM %s.%s WHERE table_name = ?;` ReplaceEngineTemplate = ` REPLACE INTO %s.%s (table_name, engine_id, status) VALUES (?, ?, ?);` ReplaceChunkTemplate = `` /* 346-byte string literal not displayed */ UpdateChunkTemplate = `` /* 168-byte string literal not displayed */ UpdateTableRebaseTemplate = ` UPDATE %s.%s SET alloc_base = GREATEST(?, alloc_base) WHERE table_name = ?;` UpdateTableStatusTemplate = ` UPDATE %s.%s SET status = ? WHERE table_name = ?;` UpdateEngineTemplate = ` UPDATE %s.%s SET status = ? WHERE (table_name, engine_id) = (?, ?);` DeleteCheckpointRecordTemplate = "DELETE FROM %s.%s WHERE table_name = ?;" )
View Source
const WholeTableEngineID = math.MaxInt32
Variables ¶
Functions ¶
func IsCheckpointTable ¶
func IsCheckpointsDBExists ¶
Types ¶
type CheckpointStatus ¶
type CheckpointStatus uint8
const ( CheckpointStatusMissing CheckpointStatus = 0 CheckpointStatusMaxInvalid CheckpointStatus = 25 CheckpointStatusLoaded CheckpointStatus = 30 CheckpointStatusAllWritten CheckpointStatus = 60 CheckpointStatusClosed CheckpointStatus = 90 CheckpointStatusImported CheckpointStatus = 120 CheckpointStatusIndexImported CheckpointStatus = 140 CheckpointStatusAlteredAutoInc CheckpointStatus = 150 CheckpointStatusChecksumSkipped CheckpointStatus = 170 CheckpointStatusChecksummed CheckpointStatus = 180 CheckpointStatusAnalyzeSkipped CheckpointStatus = 200 CheckpointStatusAnalyzed CheckpointStatus = 210 )
func (CheckpointStatus) MetricName ¶
func (status CheckpointStatus) MetricName() string
type CheckpointsDB ¶
type CheckpointsDB interface {
Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
Get(ctx context.Context, tableName string) (*TableCheckpoint, error)
Close() error
// InsertEngineCheckpoints initializes the checkpoints related to a table.
// It assumes the entire table has not been imported before and will fill in
// default values for the column permutations and checksums.
InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error
Update(checkpointDiffs map[string]*TableCheckpointDiff)
RemoveCheckpoint(ctx context.Context, tableName string) error
// MoveCheckpoints renames the checkpoint schema to include a suffix
// including the taskID (e.g. `tidb_lightning_checkpoints.1234567890.bak`).
MoveCheckpoints(ctx context.Context, taskID int64) error
// GetLocalStoringTables returns a map containing tables have engine files stored on local disk.
// currently only meaningful for local backend
GetLocalStoringTables(ctx context.Context) (map[string][]int32, error)
IgnoreErrorCheckpoint(ctx context.Context, tableName string) error
DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)
DumpTables(ctx context.Context, csv io.Writer) error
DumpEngines(ctx context.Context, csv io.Writer) error
DumpChunks(ctx context.Context, csv io.Writer) error
}
func OpenCheckpointsDB ¶
type CheckpointsModel ¶
type CheckpointsModel struct {
// key is table_name
Checkpoints map[string]*TableCheckpointModel `` /* 163-byte string literal not displayed */
TaskCheckpoint *TaskCheckpointModel `protobuf:"bytes,2,opt,name=task_checkpoint,json=taskCheckpoint,proto3" json:"task_checkpoint,omitempty"`
}
func (*CheckpointsModel) Descriptor ¶
func (*CheckpointsModel) Descriptor() ([]byte, []int)
func (*CheckpointsModel) Marshal ¶
func (m *CheckpointsModel) Marshal() (dAtA []byte, err error)
func (*CheckpointsModel) MarshalToSizedBuffer ¶
func (m *CheckpointsModel) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*CheckpointsModel) ProtoMessage ¶
func (*CheckpointsModel) ProtoMessage()
func (*CheckpointsModel) Reset ¶
func (m *CheckpointsModel) Reset()
func (*CheckpointsModel) Size ¶
func (m *CheckpointsModel) Size() (n int)
func (*CheckpointsModel) String ¶
func (m *CheckpointsModel) String() string
func (*CheckpointsModel) Unmarshal ¶
func (m *CheckpointsModel) Unmarshal(dAtA []byte) error
func (*CheckpointsModel) XXX_DiscardUnknown ¶
func (m *CheckpointsModel) XXX_DiscardUnknown()
func (*CheckpointsModel) XXX_Marshal ¶
func (m *CheckpointsModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*CheckpointsModel) XXX_Merge ¶
func (m *CheckpointsModel) XXX_Merge(src proto.Message)
func (*CheckpointsModel) XXX_Size ¶
func (m *CheckpointsModel) XXX_Size() int
func (*CheckpointsModel) XXX_Unmarshal ¶
func (m *CheckpointsModel) XXX_Unmarshal(b []byte) error
type ChunkCheckpoint ¶
type ChunkCheckpoint struct {
Key ChunkCheckpointKey
FileMeta mydump.SourceFileMeta
ColumnPermutation []int
Chunk mydump.Chunk
Checksum verify.KVChecksum
Timestamp int64
}
func (*ChunkCheckpoint) DeepCopy ¶
func (ccp *ChunkCheckpoint) DeepCopy() *ChunkCheckpoint
type ChunkCheckpointKey ¶
func (*ChunkCheckpointKey) String ¶
func (key *ChunkCheckpointKey) String() string
type ChunkCheckpointMerger ¶
type ChunkCheckpointMerger struct {
EngineID int32
Key ChunkCheckpointKey
Checksum verify.KVChecksum
Pos int64
RowID int64
ColumnPermutation []int
}
func (*ChunkCheckpointMerger) MergeInto ¶
func (merger *ChunkCheckpointMerger) MergeInto(cpd *TableCheckpointDiff)
type ChunkCheckpointModel ¶
type ChunkCheckpointModel struct {
Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
Offset int64 `protobuf:"varint,2,opt,name=offset,proto3" json:"offset,omitempty"`
ColumnPermutation []int32 `protobuf:"varint,12,rep,packed,name=column_permutation,json=columnPermutation,proto3" json:"column_permutation,omitempty"`
EndOffset int64 `protobuf:"varint,5,opt,name=end_offset,json=endOffset,proto3" json:"end_offset,omitempty"`
Pos int64 `protobuf:"varint,6,opt,name=pos,proto3" json:"pos,omitempty"`
PrevRowidMax int64 `protobuf:"varint,7,opt,name=prev_rowid_max,json=prevRowidMax,proto3" json:"prev_rowid_max,omitempty"`
RowidMax int64 `protobuf:"varint,8,opt,name=rowid_max,json=rowidMax,proto3" json:"rowid_max,omitempty"`
KvcBytes uint64 `protobuf:"varint,9,opt,name=kvc_bytes,json=kvcBytes,proto3" json:"kvc_bytes,omitempty"`
KvcKvs uint64 `protobuf:"varint,10,opt,name=kvc_kvs,json=kvcKvs,proto3" json:"kvc_kvs,omitempty"`
KvcChecksum uint64 `protobuf:"fixed64,11,opt,name=kvc_checksum,json=kvcChecksum,proto3" json:"kvc_checksum,omitempty"`
Timestamp int64 `protobuf:"fixed64,13,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
Type int32 `protobuf:"varint,14,opt,name=type,proto3" json:"type,omitempty"`
Compression int32 `protobuf:"varint,15,opt,name=compression,proto3" json:"compression,omitempty"`
SortKey string `protobuf:"bytes,16,opt,name=sort_key,json=sortKey,proto3" json:"sort_key,omitempty"`
FileSize int64 `protobuf:"varint,17,opt,name=file_size,json=fileSize,proto3" json:"file_size,omitempty"`
}
func (*ChunkCheckpointModel) Descriptor ¶
func (*ChunkCheckpointModel) Descriptor() ([]byte, []int)
func (*ChunkCheckpointModel) Marshal ¶
func (m *ChunkCheckpointModel) Marshal() (dAtA []byte, err error)
func (*ChunkCheckpointModel) MarshalTo ¶
func (m *ChunkCheckpointModel) MarshalTo(dAtA []byte) (int, error)
func (*ChunkCheckpointModel) MarshalToSizedBuffer ¶
func (m *ChunkCheckpointModel) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*ChunkCheckpointModel) ProtoMessage ¶
func (*ChunkCheckpointModel) ProtoMessage()
func (*ChunkCheckpointModel) Reset ¶
func (m *ChunkCheckpointModel) Reset()
func (*ChunkCheckpointModel) Size ¶
func (m *ChunkCheckpointModel) Size() (n int)
func (*ChunkCheckpointModel) String ¶
func (m *ChunkCheckpointModel) String() string
func (*ChunkCheckpointModel) Unmarshal ¶
func (m *ChunkCheckpointModel) Unmarshal(dAtA []byte) error
func (*ChunkCheckpointModel) XXX_DiscardUnknown ¶
func (m *ChunkCheckpointModel) XXX_DiscardUnknown()
func (*ChunkCheckpointModel) XXX_Marshal ¶
func (m *ChunkCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*ChunkCheckpointModel) XXX_Merge ¶
func (m *ChunkCheckpointModel) XXX_Merge(src proto.Message)
func (*ChunkCheckpointModel) XXX_Size ¶
func (m *ChunkCheckpointModel) XXX_Size() int
func (*ChunkCheckpointModel) XXX_Unmarshal ¶
func (m *ChunkCheckpointModel) XXX_Unmarshal(b []byte) error
type EngineCheckpoint ¶
type EngineCheckpoint struct {
Status CheckpointStatus
Chunks []*ChunkCheckpoint // a sorted array
}
func (*EngineCheckpoint) DeepCopy ¶
func (engine *EngineCheckpoint) DeepCopy() *EngineCheckpoint
type EngineCheckpointModel ¶
type EngineCheckpointModel struct {
Status uint32 `protobuf:"varint,1,opt,name=status,proto3" json:"status,omitempty"`
// key is "$path:$offset"
Chunks map[string]*ChunkCheckpointModel `` /* 153-byte string literal not displayed */
}
func (*EngineCheckpointModel) Descriptor ¶
func (*EngineCheckpointModel) Descriptor() ([]byte, []int)
func (*EngineCheckpointModel) Marshal ¶
func (m *EngineCheckpointModel) Marshal() (dAtA []byte, err error)
func (*EngineCheckpointModel) MarshalTo ¶
func (m *EngineCheckpointModel) MarshalTo(dAtA []byte) (int, error)
func (*EngineCheckpointModel) MarshalToSizedBuffer ¶
func (m *EngineCheckpointModel) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*EngineCheckpointModel) ProtoMessage ¶
func (*EngineCheckpointModel) ProtoMessage()
func (*EngineCheckpointModel) Reset ¶
func (m *EngineCheckpointModel) Reset()
func (*EngineCheckpointModel) Size ¶
func (m *EngineCheckpointModel) Size() (n int)
func (*EngineCheckpointModel) String ¶
func (m *EngineCheckpointModel) String() string
func (*EngineCheckpointModel) Unmarshal ¶
func (m *EngineCheckpointModel) Unmarshal(dAtA []byte) error
func (*EngineCheckpointModel) XXX_DiscardUnknown ¶
func (m *EngineCheckpointModel) XXX_DiscardUnknown()
func (*EngineCheckpointModel) XXX_Marshal ¶
func (m *EngineCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*EngineCheckpointModel) XXX_Merge ¶
func (m *EngineCheckpointModel) XXX_Merge(src proto.Message)
func (*EngineCheckpointModel) XXX_Size ¶
func (m *EngineCheckpointModel) XXX_Size() int
func (*EngineCheckpointModel) XXX_Unmarshal ¶
func (m *EngineCheckpointModel) XXX_Unmarshal(b []byte) error
type FileCheckpointsDB ¶
type FileCheckpointsDB struct {
// contains filtered or unexported fields
}
func NewFileCheckpointsDB ¶
func NewFileCheckpointsDB(path string) *FileCheckpointsDB
func (*FileCheckpointsDB) Close ¶
func (cpdb *FileCheckpointsDB) Close() error
func (*FileCheckpointsDB) DestroyErrorCheckpoint ¶
func (cpdb *FileCheckpointsDB) DestroyErrorCheckpoint(_ context.Context, targetTableName string) ([]DestroyedTableCheckpoint, error)
func (*FileCheckpointsDB) DumpChunks ¶
func (*FileCheckpointsDB) DumpEngines ¶
func (*FileCheckpointsDB) DumpTables ¶
func (*FileCheckpointsDB) Get ¶
func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableCheckpoint, error)
func (*FileCheckpointsDB) GetLocalStoringTables ¶
func (*FileCheckpointsDB) IgnoreErrorCheckpoint ¶
func (cpdb *FileCheckpointsDB) IgnoreErrorCheckpoint(_ context.Context, targetTableName string) error
func (*FileCheckpointsDB) Initialize ¶
func (cpdb *FileCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
func (*FileCheckpointsDB) InsertEngineCheckpoints ¶
func (cpdb *FileCheckpointsDB) InsertEngineCheckpoints(_ context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error
func (*FileCheckpointsDB) MoveCheckpoints ¶
func (cpdb *FileCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error
func (*FileCheckpointsDB) RemoveCheckpoint ¶
func (cpdb *FileCheckpointsDB) RemoveCheckpoint(_ context.Context, tableName string) error
func (*FileCheckpointsDB) TaskCheckpoint ¶
func (cpdb *FileCheckpointsDB) TaskCheckpoint(_ context.Context) (*TaskCheckpoint, error)
func (*FileCheckpointsDB) Update ¶
func (cpdb *FileCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)
type GlueCheckpointsDB ¶
type GlueCheckpointsDB struct {
// contains filtered or unexported fields
}
GlueCheckpointsDB is almost same with MySQLCheckpointsDB, but it uses TiDB's internal data structure which requires a lot to keep same with database/sql. TODO: Encapsulate Begin/Commit/Rollback txn, form SQL with args and query/iter/scan TiDB's RecordSet into a interface to reuse MySQLCheckpointsDB.
func NewGlueCheckpointsDB ¶
func (GlueCheckpointsDB) Close ¶
func (g GlueCheckpointsDB) Close() error
func (GlueCheckpointsDB) DestroyErrorCheckpoint ¶
func (g GlueCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)
func (GlueCheckpointsDB) DumpChunks ¶
func (GlueCheckpointsDB) DumpEngines ¶
func (GlueCheckpointsDB) DumpTables ¶
func (GlueCheckpointsDB) Get ¶
func (g GlueCheckpointsDB) Get(ctx context.Context, tableName string) (*TableCheckpoint, error)
func (GlueCheckpointsDB) GetLocalStoringTables ¶
func (GlueCheckpointsDB) IgnoreErrorCheckpoint ¶
func (g GlueCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error
func (GlueCheckpointsDB) Initialize ¶
func (g GlueCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
func (GlueCheckpointsDB) InsertEngineCheckpoints ¶
func (g GlueCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, checkpointMap map[int32]*EngineCheckpoint) error
func (GlueCheckpointsDB) MoveCheckpoints ¶
func (g GlueCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error
func (GlueCheckpointsDB) RemoveCheckpoint ¶
func (g GlueCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error
func (GlueCheckpointsDB) TaskCheckpoint ¶
func (g GlueCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
func (GlueCheckpointsDB) Update ¶
func (g GlueCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)
type MySQLCheckpointsDB ¶
type MySQLCheckpointsDB struct {
// contains filtered or unexported fields
}
func NewMySQLCheckpointsDB ¶
func (*MySQLCheckpointsDB) Close ¶
func (cpdb *MySQLCheckpointsDB) Close() error
func (*MySQLCheckpointsDB) DestroyErrorCheckpoint ¶
func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)
func (*MySQLCheckpointsDB) DumpChunks ¶
func (*MySQLCheckpointsDB) DumpEngines ¶
func (*MySQLCheckpointsDB) DumpTables ¶
func (*MySQLCheckpointsDB) Get ¶
func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*TableCheckpoint, error)
func (*MySQLCheckpointsDB) GetLocalStoringTables ¶
func (*MySQLCheckpointsDB) IgnoreErrorCheckpoint ¶
func (cpdb *MySQLCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error
func (*MySQLCheckpointsDB) Initialize ¶
func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
func (*MySQLCheckpointsDB) InsertEngineCheckpoints ¶
func (cpdb *MySQLCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error
func (*MySQLCheckpointsDB) MoveCheckpoints ¶
func (cpdb *MySQLCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error
func (*MySQLCheckpointsDB) RemoveCheckpoint ¶
func (cpdb *MySQLCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error
func (*MySQLCheckpointsDB) TaskCheckpoint ¶
func (cpdb *MySQLCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
func (*MySQLCheckpointsDB) Update ¶
func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)
type NullCheckpointsDB ¶
type NullCheckpointsDB struct{}
NullCheckpointsDB is a checkpoints database with no checkpoints.
func NewNullCheckpointsDB ¶
func NewNullCheckpointsDB() *NullCheckpointsDB
func (*NullCheckpointsDB) Close ¶
func (*NullCheckpointsDB) Close() error
func (*NullCheckpointsDB) DestroyErrorCheckpoint ¶
func (*NullCheckpointsDB) DestroyErrorCheckpoint(context.Context, string) ([]DestroyedTableCheckpoint, error)
func (*NullCheckpointsDB) DumpChunks ¶
func (*NullCheckpointsDB) DumpEngines ¶
func (*NullCheckpointsDB) DumpTables ¶
func (*NullCheckpointsDB) Get ¶
func (*NullCheckpointsDB) Get(_ context.Context, _ string) (*TableCheckpoint, error)
func (*NullCheckpointsDB) GetLocalStoringTables ¶
func (*NullCheckpointsDB) IgnoreErrorCheckpoint ¶
func (*NullCheckpointsDB) IgnoreErrorCheckpoint(context.Context, string) error
func (*NullCheckpointsDB) Initialize ¶
func (*NullCheckpointsDB) Initialize(context.Context, *config.Config, map[string]*TidbDBInfo) error
func (*NullCheckpointsDB) InsertEngineCheckpoints ¶
func (*NullCheckpointsDB) InsertEngineCheckpoints(_ context.Context, _ string, _ map[int32]*EngineCheckpoint) error
func (*NullCheckpointsDB) MoveCheckpoints ¶
func (*NullCheckpointsDB) MoveCheckpoints(context.Context, int64) error
func (*NullCheckpointsDB) RemoveCheckpoint ¶
func (*NullCheckpointsDB) RemoveCheckpoint(context.Context, string) error
func (*NullCheckpointsDB) TaskCheckpoint ¶
func (*NullCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
func (*NullCheckpointsDB) Update ¶
func (*NullCheckpointsDB) Update(map[string]*TableCheckpointDiff)
type RebaseCheckpointMerger ¶
type RebaseCheckpointMerger struct {
AllocBase int64
}
func (*RebaseCheckpointMerger) MergeInto ¶
func (merger *RebaseCheckpointMerger) MergeInto(cpd *TableCheckpointDiff)
type Session ¶
type Session interface {
Close()
Execute(context.Context, string) ([]sqlexec.RecordSet, error)
CommitTxn(context.Context) error
RollbackTxn(context.Context)
PrepareStmt(sql string) (stmtID uint32, paramCount int, fields []*ast.ResultField, err error)
ExecutePreparedStmt(ctx context.Context, stmtID uint32, param []types.Datum) (sqlexec.RecordSet, error)
DropPreparedStmt(stmtID uint32) error
}
type StatusCheckpointMerger ¶
type StatusCheckpointMerger struct {
EngineID int32 // WholeTableEngineID == apply to whole table.
Status CheckpointStatus
}
func (*StatusCheckpointMerger) MergeInto ¶
func (merger *StatusCheckpointMerger) MergeInto(cpd *TableCheckpointDiff)
func (*StatusCheckpointMerger) SetInvalid ¶
func (merger *StatusCheckpointMerger) SetInvalid()
type TableCheckpoint ¶
type TableCheckpoint struct {
Status CheckpointStatus
AllocBase int64
Engines map[int32]*EngineCheckpoint
TableID int64
}
func (*TableCheckpoint) Apply ¶
func (cp *TableCheckpoint) Apply(cpd *TableCheckpointDiff)
Apply the diff to the existing chunk and engine checkpoints in `cp`.
func (*TableCheckpoint) CountChunks ¶
func (cp *TableCheckpoint) CountChunks() int
func (*TableCheckpoint) DeepCopy ¶
func (cp *TableCheckpoint) DeepCopy() *TableCheckpoint
type TableCheckpointDiff ¶
type TableCheckpointDiff struct {
// contains filtered or unexported fields
}
func NewTableCheckpointDiff ¶
func NewTableCheckpointDiff() *TableCheckpointDiff
func (*TableCheckpointDiff) String ¶
func (cpd *TableCheckpointDiff) String() string
type TableCheckpointMerger ¶
type TableCheckpointMerger interface {
// MergeInto the table checkpoint diff from a status update or chunk update.
// If there are multiple updates to the same table, only the last one will
// take effect. Therefore, the caller must ensure events for the same table
// are properly ordered by the global time (an old event must be merged
// before the new one).
MergeInto(cpd *TableCheckpointDiff)
}
type TableCheckpointModel ¶
type TableCheckpointModel struct {
Hash []byte `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"`
Status uint32 `protobuf:"varint,3,opt,name=status,proto3" json:"status,omitempty"`
AllocBase int64 `protobuf:"varint,4,opt,name=alloc_base,json=allocBase,proto3" json:"alloc_base,omitempty"`
Engines map[int32]*EngineCheckpointModel `` /* 158-byte string literal not displayed */
TableID int64 `protobuf:"varint,9,opt,name=tableID,proto3" json:"tableID,omitempty"`
}
func (*TableCheckpointModel) Descriptor ¶
func (*TableCheckpointModel) Descriptor() ([]byte, []int)
func (*TableCheckpointModel) Marshal ¶
func (m *TableCheckpointModel) Marshal() (dAtA []byte, err error)
func (*TableCheckpointModel) MarshalTo ¶
func (m *TableCheckpointModel) MarshalTo(dAtA []byte) (int, error)
func (*TableCheckpointModel) MarshalToSizedBuffer ¶
func (m *TableCheckpointModel) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*TableCheckpointModel) ProtoMessage ¶
func (*TableCheckpointModel) ProtoMessage()
func (*TableCheckpointModel) Reset ¶
func (m *TableCheckpointModel) Reset()
func (*TableCheckpointModel) Size ¶
func (m *TableCheckpointModel) Size() (n int)
func (*TableCheckpointModel) String ¶
func (m *TableCheckpointModel) String() string
func (*TableCheckpointModel) Unmarshal ¶
func (m *TableCheckpointModel) Unmarshal(dAtA []byte) error
func (*TableCheckpointModel) XXX_DiscardUnknown ¶
func (m *TableCheckpointModel) XXX_DiscardUnknown()
func (*TableCheckpointModel) XXX_Marshal ¶
func (m *TableCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*TableCheckpointModel) XXX_Merge ¶
func (m *TableCheckpointModel) XXX_Merge(src proto.Message)
func (*TableCheckpointModel) XXX_Size ¶
func (m *TableCheckpointModel) XXX_Size() int
func (*TableCheckpointModel) XXX_Unmarshal ¶
func (m *TableCheckpointModel) XXX_Unmarshal(b []byte) error
type TaskCheckpoint ¶
type TaskCheckpointModel ¶
type TaskCheckpointModel struct {
TaskId int64 `protobuf:"varint,1,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"`
SourceDir string `protobuf:"bytes,2,opt,name=source_dir,json=sourceDir,proto3" json:"source_dir,omitempty"`
Backend string `protobuf:"bytes,3,opt,name=backend,proto3" json:"backend,omitempty"`
ImporterAddr string `protobuf:"bytes,4,opt,name=importer_addr,json=importerAddr,proto3" json:"importer_addr,omitempty"`
TidbHost string `protobuf:"bytes,5,opt,name=tidb_host,json=tidbHost,proto3" json:"tidb_host,omitempty"`
TidbPort int32 `protobuf:"varint,6,opt,name=tidb_port,json=tidbPort,proto3" json:"tidb_port,omitempty"`
PdAddr string `protobuf:"bytes,7,opt,name=pd_addr,json=pdAddr,proto3" json:"pd_addr,omitempty"`
SortedKvDir string `protobuf:"bytes,8,opt,name=sorted_kv_dir,json=sortedKvDir,proto3" json:"sorted_kv_dir,omitempty"`
LightningVer string `protobuf:"bytes,9,opt,name=lightning_ver,json=lightningVer,proto3" json:"lightning_ver,omitempty"`
}
func (*TaskCheckpointModel) Descriptor ¶
func (*TaskCheckpointModel) Descriptor() ([]byte, []int)
func (*TaskCheckpointModel) Marshal ¶
func (m *TaskCheckpointModel) Marshal() (dAtA []byte, err error)
func (*TaskCheckpointModel) MarshalTo ¶
func (m *TaskCheckpointModel) MarshalTo(dAtA []byte) (int, error)
func (*TaskCheckpointModel) MarshalToSizedBuffer ¶
func (m *TaskCheckpointModel) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*TaskCheckpointModel) ProtoMessage ¶
func (*TaskCheckpointModel) ProtoMessage()
func (*TaskCheckpointModel) Reset ¶
func (m *TaskCheckpointModel) Reset()
func (*TaskCheckpointModel) Size ¶
func (m *TaskCheckpointModel) Size() (n int)
func (*TaskCheckpointModel) String ¶
func (m *TaskCheckpointModel) String() string
func (*TaskCheckpointModel) Unmarshal ¶
func (m *TaskCheckpointModel) Unmarshal(dAtA []byte) error
func (*TaskCheckpointModel) XXX_DiscardUnknown ¶
func (m *TaskCheckpointModel) XXX_DiscardUnknown()
func (*TaskCheckpointModel) XXX_Marshal ¶
func (m *TaskCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*TaskCheckpointModel) XXX_Merge ¶
func (m *TaskCheckpointModel) XXX_Merge(src proto.Message)
func (*TaskCheckpointModel) XXX_Size ¶
func (m *TaskCheckpointModel) XXX_Size() int
func (*TaskCheckpointModel) XXX_Unmarshal ¶
func (m *TaskCheckpointModel) XXX_Unmarshal(b []byte) error
type TidbDBInfo ¶
type TidbDBInfo struct {
Name string
Tables map[string]*TidbTableInfo
}
Click to show internal directories.
Click to hide internal directories.