Documentation
¶
Index ¶
- func MakeUUID(tableName string, engineID int64) (string, uuid.UUID)
- type Backend
- type CheckCtx
- type ChunkFlushStatus
- type ClosedEngine
- type EngineConfig
- type EngineFileSize
- type EngineManager
- func (be EngineManager) OpenEngine(ctx context.Context, config *EngineConfig, tableName string, engineID int32) (*OpenedEngine, error)
- func (be EngineManager) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig, tableName string, engineID int32) (*ClosedEngine, error)
- func (be EngineManager) UnsafeCloseEngineWithUUID(ctx context.Context, cfg *EngineConfig, tag string, engineUUID uuid.UUID, ...) (*ClosedEngine, error)
- type EngineWriter
- type ExternalEngineConfig
- type LocalEngineConfig
- type LocalWriterConfig
- type OpenedEngine
- func (engine *OpenedEngine) Close(ctx context.Context) (*ClosedEngine, error)
- func (engine *OpenedEngine) Flush(ctx context.Context) error
- func (engine *OpenedEngine) GetEngineUUID() uuid.UUID
- func (en OpenedEngine) GetID() int32
- func (en OpenedEngine) GetUUID() uuid.UUID
- func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterConfig) (EngineWriter, error)
- type TargetInfoGetter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Backend ¶
type Backend interface {
// Close the connection to the backend.
Close()
// RetryImportDelay returns the duration to sleep when retrying an import
RetryImportDelay() time.Duration
// ShouldPostProcess returns whether KV-specific post-processing should be
// performed for this backend. Post-processing includes checksum and analyze.
ShouldPostProcess() bool
OpenEngine(ctx context.Context, config *EngineConfig, engineUUID uuid.UUID) error
CloseEngine(ctx context.Context, config *EngineConfig, engineUUID uuid.UUID) error
// ImportEngine imports engine data to the backend. If it returns ErrDuplicateDetected,
// it means there is duplicate detected. For this situation, all data in the engine must be imported.
// It's safe to reset or cleanup this engine.
ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error
CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error
// FlushEngine ensures all KV pairs written to an open engine has been
// synchronized, such that kill-9'ing Lightning afterwards and resuming from
// checkpoint can recover the exact same content.
//
// This method is only relevant for local backend, and is no-op for all
// other backends.
FlushEngine(ctx context.Context, engineUUID uuid.UUID) error
// FlushAllEngines performs FlushEngine on all opened engines. This is a
// very expensive operation and should only be used in some rare situation
// (e.g. preparing to resolve a disk quota violation).
FlushAllEngines(ctx context.Context) error
// ResetEngine clears all written KV pairs in this opened engine.
ResetEngine(ctx context.Context, engineUUID uuid.UUID) error
// LocalWriter obtains a thread-local EngineWriter for writing rows into the given engine.
LocalWriter(ctx context.Context, cfg *LocalWriterConfig, engineUUID uuid.UUID) (EngineWriter, error)
}
Backend defines the interface for a backend. Implementations of this interface must be goroutine safe: you can share an instance and execute any method anywhere. Usual workflow:
- Create a `Backend` for the whole process.
- For each table, i. Split into multiple "batches" consisting of data files with roughly equal total size. ii. For each batch, a. Create an `OpenedEngine` via `backend.OpenEngine()` b. For each chunk, deliver data into the engine via `engine.WriteRows()` c. When all chunks are written, obtain a `ClosedEngine` via `engine.Close()` d. Import data via `engine.Import()` e. Cleanup via `engine.Cleanup()`
- Close the connection via `backend.Close()`
type CheckCtx ¶
type CheckCtx struct {
DBMetas []*mydump.MDDatabaseMeta
}
CheckCtx contains all parameters used in CheckRequirements
type ChunkFlushStatus ¶
type ChunkFlushStatus interface {
Flushed() bool
}
ChunkFlushStatus is the status of a chunk flush.
type ClosedEngine ¶
type ClosedEngine struct {
// contains filtered or unexported fields
}
ClosedEngine represents a closed engine, allowing ingestion into the target. This type is goroutine safe: you can share an instance and execute any method anywhere.
func NewClosedEngine ¶
NewClosedEngine creates a new ClosedEngine.
func (*ClosedEngine) Cleanup ¶
func (engine *ClosedEngine) Cleanup(ctx context.Context) error
Cleanup deletes the intermediate data from target.
func (*ClosedEngine) Import ¶
func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize, regionSplitKeys int64) error
Import the data written to the engine into the target.
func (*ClosedEngine) Logger ¶
func (engine *ClosedEngine) Logger() log.Logger
Logger returns the logger for the engine.
type EngineConfig ¶
type EngineConfig struct {
// TableInfo is the corresponding tidb table info
TableInfo *checkpoints.TidbTableInfo
// local backend specified configuration
Local LocalEngineConfig
// local backend external engine specified configuration
External *ExternalEngineConfig
// KeepSortDir indicates whether to keep the temporary sort directory
// when opening the engine, instead of removing it.
KeepSortDir bool
// TS is the preset timestamp of data in the engine. When it's 0, the used TS
// will be set lazily. This is used by local backend. This field will be written
// to engineMeta.TS and take effect in below cases:
// - engineManager.openEngine
// - engineManager.closeEngine only for an external engine
TS uint64
}
EngineConfig defines configuration used for open engine
type EngineFileSize ¶
type EngineFileSize struct {
// UUID is the engine's UUID.
UUID uuid.UUID
// DiskSize is the estimated total file size on disk right now.
DiskSize int64
// MemSize is the total memory size used by the engine. This is the
// estimated additional size saved onto disk after calling Flush().
MemSize int64
// IsImporting indicates whether the engine performing Import().
IsImporting bool
}
EngineFileSize represents the size of an engine on disk and in memory.
type EngineManager ¶
type EngineManager struct {
// contains filtered or unexported fields
}
EngineManager is the manager of engines. this is a wrapper of Backend, which provides some common methods for managing engines. and it has no states, can be created on demand
func MakeEngineManager ¶
func MakeEngineManager(ab Backend) EngineManager
MakeEngineManager creates a new Backend from an Backend.
func (EngineManager) OpenEngine ¶
func (be EngineManager) OpenEngine( ctx context.Context, config *EngineConfig, tableName string, engineID int32, ) (*OpenedEngine, error)
OpenEngine opens an engine with the given table name and engine ID.
func (EngineManager) UnsafeCloseEngine ¶
func (be EngineManager) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig, tableName string, engineID int32) (*ClosedEngine, error)
UnsafeCloseEngine closes the engine without first opening it. This method is "unsafe" as it does not follow the normal operation sequence (Open -> Write -> Close -> Import). This method should only be used when one knows via other ways that the engine has already been opened, e.g. when resuming from a checkpoint.
func (EngineManager) UnsafeCloseEngineWithUUID ¶
func (be EngineManager) UnsafeCloseEngineWithUUID(ctx context.Context, cfg *EngineConfig, tag string, engineUUID uuid.UUID, id int32) (*ClosedEngine, error)
UnsafeCloseEngineWithUUID closes the engine without first opening it. This method is "unsafe" as it does not follow the normal operation sequence (Open -> Write -> Close -> Import). This method should only be used when one knows via other ways that the engine has already been opened, e.g. when resuming from a checkpoint.
type EngineWriter ¶
type EngineWriter interface {
AppendRows(ctx context.Context, columnNames []string, rows encode.Rows) error
IsSynced() bool
Close(ctx context.Context) (ChunkFlushStatus, error)
}
EngineWriter is the interface for writing data to an engine.
type ExternalEngineConfig ¶
type ExternalEngineConfig struct {
StorageURI string
DataFiles []string
StatFiles []string
StartKey []byte
EndKey []byte
JobKeys [][]byte
SplitKeys [][]byte
// TotalFileSize can be an estimated value.
TotalFileSize int64
// TotalKVCount can be an estimated value.
TotalKVCount int64
CheckHotspot bool
// MemCapacity is the memory capacity for the whole subtask.
MemCapacity int64
// OnDup is the action when a duplicate key is found during global sort.
OnDup common.OnDuplicateKey
}
ExternalEngineConfig is the configuration used for local backend external engine.
type LocalEngineConfig ¶
type LocalEngineConfig struct {
// compact small SSTs before ingest into pebble
Compact bool
// raw kvs size threshold to trigger compact
CompactThreshold int64
// compact routine concurrency
CompactConcurrency int
// blocksize
BlockSize int
}
LocalEngineConfig is the configuration used for local backend in OpenEngine.
type LocalWriterConfig ¶
type LocalWriterConfig struct {
// Local backend specified configuration
Local struct {
// is the chunk KV written to this LocalWriter sent in order
IsKVSorted bool
// MemCacheSize specifies the estimated memory cache limit used by this local
// writer. It has higher priority than BackendConfig.LocalWriterMemCacheSize if
// set.
MemCacheSize int64
}
// TiDB backend specified configuration
TiDB struct {
TableName string
}
}
LocalWriterConfig defines the configuration to open a LocalWriter
type OpenedEngine ¶
type OpenedEngine struct {
// contains filtered or unexported fields
}
OpenedEngine is an opened engine, allowing data to be written via WriteRows. This type is goroutine safe: you can share an instance and execute any method anywhere.
func (*OpenedEngine) Close ¶
func (engine *OpenedEngine) Close(ctx context.Context) (*ClosedEngine, error)
Close the opened engine to prepare it for importing.
func (*OpenedEngine) Flush ¶
func (engine *OpenedEngine) Flush(ctx context.Context) error
Flush current written data for local backend
func (*OpenedEngine) GetEngineUUID ¶
func (engine *OpenedEngine) GetEngineUUID() uuid.UUID
GetEngineUUID returns the engine UUID.
func (*OpenedEngine) LocalWriter ¶
func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterConfig) (EngineWriter, error)
LocalWriter returns a writer that writes to the local backend.
type TargetInfoGetter ¶
type TargetInfoGetter interface {
// FetchRemoteDBModels obtains the models of all databases. Currently, only
// the database name is filled.
FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error)
// FetchRemoteTableModels obtains the models of all tables given the schema
// name. The returned table info does not need to be precise if the encoder,
// is not requiring them, but must at least fill in the following fields for
// TablesFromMeta to succeed:
// - Name
// - State (must be model.StatePublic)
// - ID
// - Columns
// * Name
// * State (must be model.StatePublic)
// * Offset (must be 0, 1, 2, ...)
// - PKIsHandle (true = do not generate _tidb_rowid)
FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error)
// CheckRequirements performs the check whether the backend satisfies the version requirements
CheckRequirements(ctx context.Context, checkCtx *CheckCtx) error
}
TargetInfoGetter defines the interfaces to get target information.