Documentation
¶
Index ¶
- Constants
- Variables
- func CancelJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64) (err error)
- func CreateJob(ctx context.Context, conn sqlexec.SQLExecutor, db, table string, tableID int64, ...) (int64, error)
- func FailJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, errorMsg string, ...) error
- func FinishJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, summary *Summary) error
- func FlushTableStats(ctx context.Context, se sessionctx.Context, tableID int64, importedRows int64) error
- func GetActiveJobCnt(ctx context.Context, conn sqlexec.SQLExecutor, tableSchema, tableName string) (int64, error)
- func GetBackoffWeight(plan *Plan) int
- func GetImportRootDir(tidbCfg *tidb.Config) string
- func GetIndicesGenKV(tblInfo *model.TableInfo) map[int64]GenKVIndex
- func GetNumOfIndexGenKV(tblInfo *model.TableInfo) int
- func GetRegionSplitSizeKeys(ctx context.Context) (regionSplitSize int64, regionSplitKeys int64, err error)
- func GetSortStore(ctx context.Context, url string) (storage.ExternalStorage, error)
- func GetTargetNodeCPUCnt(ctx context.Context, sourceType DataSourceType, path string) (int, error)
- func Job2Step(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, step string) error
- func PostProcess(ctx context.Context, se sessionctx.Context, ...) (err error)
- func ProcessChunk(ctx context.Context, chunk *checkpoints.ChunkCheckpoint, ...) error
- func ProcessChunkWithWriter(ctx context.Context, chunk *checkpoints.ChunkCheckpoint, ...) error
- func RebaseAllocatorBases(ctx context.Context, kvStore tidbkv.Storage, ...) (err error)
- func RemoteChecksumTableBySQL(ctx context.Context, se sessionctx.Context, plan *Plan, logger *zap.Logger) (*local.RemoteChecksum, error)
- func StartJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, step string) error
- func VerifyChecksum(ctx context.Context, plan *Plan, localChecksum verify.KVChecksum, ...) error
- type ASTArgs
- type Chunk
- type ChunkProcessor
- type DataSourceType
- type FieldMapping
- type GenKVIndex
- type ImportParameters
- type IndexRouteWriter
- type JobInfo
- func GetAllViewableJobs(ctx context.Context, conn sqlexec.SQLExecutor, user string, hasSuperPriv bool) ([]*JobInfo, error)
- func GetJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, user string, ...) (*JobInfo, error)
- func GetJobsByGroupKey(ctx context.Context, conn sqlexec.SQLExecutor, user, groupKey string, ...) ([]*JobInfo, error)
- type LoadDataController
- func (e *LoadDataController) CalResourceParams(ctx context.Context, ksCodec []byte) error
- func (e *LoadDataController) CheckRequirements(ctx context.Context, se sessionctx.Context) error
- func (e *LoadDataController) Close()
- func (e *LoadDataController) CreateColAssignExprs(planCtx planctx.PlanContext) (_ []expression.Expression, _ []contextutil.SQLWarn, retErr error)
- func (e *LoadDataController) CreateColAssignSimpleExprs(ctx expression.BuildContext) (_ []expression.Expression, _ []contextutil.SQLWarn, retErr error)
- func (e *LoadDataController) FullTableName() string
- func (e *LoadDataController) GenerateCSVConfig() *config.CSVConfig
- func (e *LoadDataController) GetFieldCount() int
- func (e *LoadDataController) GetLoadDataReaderInfos() []LoadDataReaderInfo
- func (e *LoadDataController) GetParser(ctx context.Context, dataFileInfo LoadDataReaderInfo) (parser mydump.Parser, err error)
- func (e *LoadDataController) HandleSkipNRows(parser mydump.Parser) error
- func (e *LoadDataController) InitDataFiles(ctx context.Context) error
- func (e *LoadDataController) InitDataStore(ctx context.Context) error
- func (e *LoadDataController) InitTiKVConfigs(ctx context.Context, sctx sessionctx.Context) error
- func (e *LoadDataController) PopulateChunks(ctx context.Context) (chunksMap map[int32][]Chunk, err error)
- func (e *LoadDataController) SetExecuteNodeCnt(cnt int)
- type LoadDataReaderInfo
- type Option
- type Plan
- type QueryChunk
- type StepSummary
- type Summary
- type TableImporter
- func (ti *TableImporter) Allocators() autoid.Allocators
- func (ti *TableImporter) Backend() *local.Backend
- func (ti *TableImporter) CheckDiskQuota(ctx context.Context)
- func (ti *TableImporter) Close() error
- func (ti *TableImporter) GetKVEncoderForDupResolve() (*TableKVEncoder, error)
- func (ti *TableImporter) GetKVStore() tidbkv.Storage
- func (ti *TableImporter) GetKeySpace() []byte
- func (ti *TableImporter) ImportAndCleanup(ctx context.Context, closedEngine *backend.ClosedEngine) (int64, error)
- func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.Context) (int64, error)
- func (ti *TableImporter) OpenDataEngine(ctx context.Context, engineID int32) (*backend.OpenedEngine, error)
- func (ti *TableImporter) OpenIndexEngine(ctx context.Context, engineID int32) (*backend.OpenedEngine, error)
- func (ti *TableImporter) SetSelectedChunkCh(ch chan QueryChunk)
- type TableKVEncoder
- type WriterFactory
Constants ¶
const ( // DataFormatCSV represents the data source file of IMPORT INTO is csv. DataFormatCSV = "csv" // DataFormatDelimitedData delimited data. DataFormatDelimitedData = "delimited data" // DataFormatSQL represents the data source file of IMPORT INTO is mydumper-format DML file. DataFormatSQL = "sql" // DataFormatParquet represents the data source file of IMPORT INTO is parquet. DataFormatParquet = "parquet" // DataFormatAuto represents format is not set in IMPORT INTO, we will determine format automatically. DataFormatAuto = "auto" // DefaultDiskQuota is the default disk quota for IMPORT INTO DefaultDiskQuota = config.ByteSize(50 << 30) // 50GiB )
const ( // JobStatusRunning exported since it's used in show import jobs JobStatusRunning = "running" // JobStatusFinished exported since it's used in show import jobs JobStatusFinished = "finished" // JobStepGlobalSorting is the first step when using global sort, // step goes from none -> global-sorting -> importing -> validating -> none. JobStepGlobalSorting = "global-sorting" // JobStepImporting is the first step when using local sort, // step goes from none -> importing -> validating -> none. // when used in global sort, it means importing the sorted data. // when used in local sort, it means encode&sort data and then importing the data. JobStepImporting = "importing" // JobStepResolvingConflicts is the step after importing to resolve conflicts, // it's used in global sort. JobStepResolvingConflicts = "resolving-conflicts" JobStepValidating = "validating" )
constants for job status and step.
Variables ¶
var ( // DefaultMinDeliverBytes 96 KB (data + index). batch at least this amount // of bytes to reduce number of messages DefaultMinDeliverBytes uint64 = 96 * units.KiB // DefaultMinDeliverRowCnt see default for tikv-importer.max-kv-pairs. DefaultMinDeliverRowCnt = 4096 )
constants, make it a variable for test
var ( // CheckDiskQuotaInterval is the default time interval to check disk quota. // TODO: make it dynamically adjusting according to the speed of import and the disk size. CheckDiskQuotaInterval = 10 * time.Second )
var GetEtcdClient = store.NewEtcdCli
GetEtcdClient returns an etcd client. exported for testing.
var ( // LoadDataReadBlockSize is exposed for test. LoadDataReadBlockSize = int64(config.ReadBlockSize) )
var ( // NewClientWithContext returns a kv.Client. NewClientWithContext = pd.NewClientWithContext )
var NewTiKVModeSwitcher = local.NewTiKVModeSwitcher
NewTiKVModeSwitcher make it a var, so we can mock it in tests.
var ( // TestLastImportJobID last created job id, used in unit test. TestLastImportJobID atomic.Int64 )
vars used for test.
Functions ¶
func CancelJob ¶
CancelJob cancels import into job. Only a running/paused job can be canceled. check privileges using get before calling this method.
func CreateJob ¶
func CreateJob( ctx context.Context, conn sqlexec.SQLExecutor, db, table string, tableID int64, user string, groupKey string, parameters *ImportParameters, sourceFileSize int64, ) (int64, error)
CreateJob creates import into job by insert a record to system table. The AUTO_INCREMENT value will be returned as jobID.
func FailJob ¶
func FailJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, errorMsg string, summary *Summary) error
FailJob fails import into job. A job can only be failed once. It will not return error when there's no matched job.
func FinishJob ¶
FinishJob tries to finish a running job with jobID, change its status to finished, clear its step and update summary. It will not return error when there's no matched job.
func FlushTableStats ¶
func FlushTableStats(ctx context.Context, se sessionctx.Context, tableID int64, importedRows int64) error
FlushTableStats flushes the stats of the table. stats will be stored in the stat collector, and be applied to to mysql.stats_meta in the domain.UpdateTableStatsLoop with a random interval between [1, 2) minutes. These stats will stay in memory until the next flush, so it might be lost if the tidb-server restarts.
func GetActiveJobCnt ¶
func GetActiveJobCnt(ctx context.Context, conn sqlexec.SQLExecutor, tableSchema, tableName string) (int64, error)
GetActiveJobCnt returns the count of active import jobs. Active import jobs include pending and running jobs.
func GetBackoffWeight ¶
GetBackoffWeight returns the backoff weight for the plan. returns max(local.DefaultBackoffWeight, plan.ImportantSysVars[vardef.TiDBBackOffWeight])
func GetImportRootDir ¶
GetImportRootDir returns the root directory for import. The directory structure is like:
-> /path/to/tidb-tmpdir -> import-4000 -> 1 -> some-uuid
exported for testing.
func GetIndicesGenKV ¶
func GetIndicesGenKV(tblInfo *model.TableInfo) map[int64]GenKVIndex
GetIndicesGenKV gets all indices that generate index KVs.
func GetNumOfIndexGenKV ¶
GetNumOfIndexGenKV gets the number of indices that generate index KVs.
func GetRegionSplitSizeKeys ¶
func GetRegionSplitSizeKeys(ctx context.Context) (regionSplitSize int64, regionSplitKeys int64, err error)
GetRegionSplitSizeKeys gets the region split size and keys from PD.
func GetSortStore ¶
GetSortStore gets the sort store.
func GetTargetNodeCPUCnt ¶
GetTargetNodeCPUCnt get cpu count of target node where the import into job will be executed. target node is current node if it's server-disk import, import from query or disttask is disabled, else it's the node managed by disttask. exported for testing.
func Job2Step ¶
Job2Step tries to change the step of a running job with jobID. It will not return error when there's no matched job.
func PostProcess ¶
func PostProcess( ctx context.Context, se sessionctx.Context, maxIDs map[autoid.AllocatorType]int64, plan *Plan, localChecksum *verify.KVGroupChecksum, logger *zap.Logger, ) (err error)
PostProcess does the post-processing for the task. exported for testing.
func ProcessChunk ¶
func ProcessChunk( ctx context.Context, chunk *checkpoints.ChunkCheckpoint, tableImporter *TableImporter, dataEngine, indexEngine *backend.OpenedEngine, logger *zap.Logger, groupChecksum *verification.KVGroupChecksum, collector execute.Collector, ) error
ProcessChunk processes a chunk, and write kv pairs to dataEngine and indexEngine.
func ProcessChunkWithWriter ¶
func ProcessChunkWithWriter( ctx context.Context, chunk *checkpoints.ChunkCheckpoint, tableImporter *TableImporter, dataWriter, indexWriter backend.EngineWriter, logger *zap.Logger, groupChecksum *verification.KVGroupChecksum, collector execute.Collector, ) error
ProcessChunkWithWriter processes a chunk, and write kv pairs to dataWriter and indexWriter.
func RebaseAllocatorBases ¶
func RebaseAllocatorBases(ctx context.Context, kvStore tidbkv.Storage, maxIDs map[autoid.AllocatorType]int64, plan *Plan, logger *zap.Logger) (err error)
RebaseAllocatorBases rebase the allocator bases.
func RemoteChecksumTableBySQL ¶
func RemoteChecksumTableBySQL(ctx context.Context, se sessionctx.Context, plan *Plan, logger *zap.Logger) (*local.RemoteChecksum, error)
RemoteChecksumTableBySQL executes the SQL to get the remote checksum of the table.
func StartJob ¶
StartJob tries to start a pending job with jobID, change its status/step to running/input step. It will not return error when there's no matched job or the job has already started.
func VerifyChecksum ¶
func VerifyChecksum(ctx context.Context, plan *Plan, localChecksum verify.KVChecksum, logger *zap.Logger, getRemoteChecksumFn remoteChecksumFunction) error
VerifyChecksum verify the checksum of the table.
Types ¶
type ASTArgs ¶
type ASTArgs struct {
FileLocRef ast.FileLocRefTp
ColumnsAndUserVars []*ast.ColumnNameOrUserVar
ColumnAssignments []*ast.Assignment
OnDuplicate ast.OnDuplicateKeyHandlingType
FieldsInfo *ast.FieldsClause
LinesInfo *ast.LinesClause
}
ASTArgs is the arguments for ast.LoadDataStmt. TODO: remove this struct and use the struct which can be serialized.
func ASTArgsFromImportPlan ¶
func ASTArgsFromImportPlan(plan *plannercore.ImportInto) *ASTArgs
ASTArgsFromImportPlan creates ASTArgs from plan.
func ASTArgsFromPlan ¶
func ASTArgsFromPlan(plan *plannercore.LoadData) *ASTArgs
ASTArgsFromPlan creates ASTArgs from plan.
func ASTArgsFromStmt ¶
ASTArgsFromStmt creates ASTArgs from statement.
type Chunk ¶
type Chunk struct {
Path string
FileSize int64
Offset int64
EndOffset int64
PrevRowIDMax int64
RowIDMax int64
Type mydump.SourceType
Compression mydump.Compression
Timestamp int64
ParquetMeta mydump.ParquetFileMeta
}
Chunk records the chunk information.
type ChunkProcessor ¶
ChunkProcessor is used to process a chunk of data, include encode data to KV and deliver KV to local or global storage.
func NewFileChunkProcessor ¶
func NewFileChunkProcessor( parser mydump.Parser, encoder *TableKVEncoder, keyspace []byte, chunk *checkpoints.ChunkCheckpoint, logger *zap.Logger, diskQuotaLock *syncutil.RWMutex, dataWriter backend.EngineWriter, indexWriter backend.EngineWriter, groupChecksum *verify.KVGroupChecksum, collector execute.Collector, ) ChunkProcessor
NewFileChunkProcessor creates a new local sort chunk processor. exported for test.
type DataSourceType ¶
type DataSourceType string
DataSourceType indicates the data source type of IMPORT INTO.
const ( // DataSourceTypeFile represents the data source of IMPORT INTO is file. // exported for test. DataSourceTypeFile DataSourceType = "file" // DataSourceTypeQuery represents the data source of IMPORT INTO is query. DataSourceTypeQuery DataSourceType = "query" )
func (DataSourceType) String ¶
func (t DataSourceType) String() string
type FieldMapping ¶
type FieldMapping struct {
Column *table.Column
UserVar *ast.VariableExpr
}
FieldMapping indicates the relationship between input field and table column or user variable
type GenKVIndex ¶
type GenKVIndex struct {
Unique bool
// contains filtered or unexported fields
}
GenKVIndex is used to store index info that generates index KVs.
type ImportParameters ¶
type ImportParameters struct {
ColumnsAndVars string `json:"columns-and-vars,omitempty"`
SetClause string `json:"set-clause,omitempty"`
// for s3 URL, AK/SK is redacted for security
FileLocation string `json:"file-location"`
Format string `json:"format"`
// only include what user specified, not include default value.
Options map[string]any `json:"options,omitempty"`
}
ImportParameters is the parameters for import into statement. it's a minimal meta info to store in tidb_import_jobs for diagnose. for detailed info, see tidb_global_tasks.
func (*ImportParameters) String ¶
func (ip *ImportParameters) String() string
String implements fmt.Stringer interface.
type IndexRouteWriter ¶
type IndexRouteWriter struct {
// contains filtered or unexported fields
}
IndexRouteWriter is a writer for index when using global sort. we route kvs of different index to different writer in order to make merge sort easier, else kv data of all subtasks will all be overlapped.
func NewIndexRouteWriter ¶
func NewIndexRouteWriter(logger *zap.Logger, writerFactory WriterFactory) *IndexRouteWriter
NewIndexRouteWriter creates a new IndexRouteWriter.
func (*IndexRouteWriter) AppendRows ¶
AppendRows implements backend.EngineWriter interface.
func (*IndexRouteWriter) Close ¶
func (w *IndexRouteWriter) Close(ctx context.Context) (common.ChunkFlushStatus, error)
Close implements backend.EngineWriter interface.
func (*IndexRouteWriter) IsSynced ¶
func (*IndexRouteWriter) IsSynced() bool
IsSynced implements backend.EngineWriter interface.
type JobInfo ¶
type JobInfo struct {
ID int64
CreateTime types.Time
StartTime types.Time
UpdateTime types.Time
EndTime types.Time
TableSchema string
TableName string
TableID int64
CreatedBy string
Parameters ImportParameters
SourceFileSize int64
Status string
// Step corresponds to the `phase` field in `SHOW IMPORT JOB`
// Here we just use the same name as in distributed framework.
Step string
// The summary of the job, it will store info for each step of the import and
// will be updated when switching to a new step.
// If the ingest step is finished, the number of ingested rows will also stored in it.
Summary *Summary
ErrorMessage string
GroupKey string
}
JobInfo is the information of import into job.
func GetAllViewableJobs ¶
func GetAllViewableJobs(ctx context.Context, conn sqlexec.SQLExecutor, user string, hasSuperPriv bool) ([]*JobInfo, error)
GetAllViewableJobs gets all viewable jobs.
func GetJob ¶
func GetJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, user string, hasSuperPriv bool) (*JobInfo, error)
GetJob returns the job with the given id if the user has privilege. hasSuperPriv: whether the user has super privilege. If the user has super privilege, the user can show or operate all jobs, else the user can only show or operate his own jobs.
func GetJobsByGroupKey ¶
func GetJobsByGroupKey(ctx context.Context, conn sqlexec.SQLExecutor, user, groupKey string, hasSuperPriv bool) ([]*JobInfo, error)
GetJobsByGroupKey gets jobs with given group key. If group key is not specified, it will return all jobs with group key set.
type LoadDataController ¶
type LoadDataController struct {
*Plan
*ASTArgs
Table table.Table
// how input field(or input column) from data file is mapped, either to a column or variable.
// if there's NO column list clause in SQL statement, then it's table's columns
// else it's user defined list.
FieldMappings []*FieldMapping
// InsertColumns the columns stated in the SQL statement to insert.
// as IMPORT INTO have 2 place to state columns, in column-vars and in set clause,
// so it's computed from both clauses:
// - append columns from column-vars to InsertColumns
// - append columns from left hand of set clause to InsertColumns
// it's similar to InsertValues.InsertColumns.
// Note: our behavior is different with mysql. such as for table t(a,b)
// - "...(a,a) set a=100" is allowed in mysql, but not in tidb
// - "...(a,b) set b=100" will set b=100 in mysql, but in tidb the set is ignored.
// - ref columns in set clause is allowed in mysql, but not in tidb
InsertColumns []*table.Column
// ExecuteNodesCnt is the count of execute nodes.
ExecuteNodesCnt int
// contains filtered or unexported fields
}
LoadDataController load data controller. todo: need a better name
func NewLoadDataController ¶
func NewLoadDataController(plan *Plan, tbl table.Table, astArgs *ASTArgs, options ...Option) (*LoadDataController, error)
NewLoadDataController create new controller.
func (*LoadDataController) CalResourceParams ¶
func (e *LoadDataController) CalResourceParams(ctx context.Context, ksCodec []byte) error
CalResourceParams calculates resource related parameters according to the total file size and target node cpu count.
func (*LoadDataController) CheckRequirements ¶
func (e *LoadDataController) CheckRequirements(ctx context.Context, se sessionctx.Context) error
CheckRequirements checks the requirements for IMPORT INTO. we check the following things here:
- when import from file 1. there is no active job on the target table 2. the total file size > 0 3. if global sort, check required privileges
- target table should be empty
- no CDC or PiTR tasks running
we check them one by one, and return the first error we meet.
func (*LoadDataController) Close ¶
func (e *LoadDataController) Close()
Close closes all the resources.
func (*LoadDataController) CreateColAssignExprs ¶
func (e *LoadDataController) CreateColAssignExprs(planCtx planctx.PlanContext) ( _ []expression.Expression, _ []contextutil.SQLWarn, retErr error, )
CreateColAssignExprs creates the column assignment expressions using session context. RewriteAstExpr will write ast node in place(due to xxNode.Accept), but it doesn't change node content, so we sync it.
func (*LoadDataController) CreateColAssignSimpleExprs ¶
func (e *LoadDataController) CreateColAssignSimpleExprs(ctx expression.BuildContext) (_ []expression.Expression, _ []contextutil.SQLWarn, retErr error)
CreateColAssignSimpleExprs creates the column assignment expressions using `expression.BuildContext`. This method does not support:
- Subquery
- System Variables (e.g. `@@tidb_enable_async_commit`)
- Window functions
- Aggregate functions
- Other special functions used in some specified queries such as `GROUPING`, `VALUES` ...
func (*LoadDataController) FullTableName ¶
func (e *LoadDataController) FullTableName() string
FullTableName return FQDN of the table.
func (*LoadDataController) GenerateCSVConfig ¶
func (e *LoadDataController) GenerateCSVConfig() *config.CSVConfig
GenerateCSVConfig generates a CSV config for parser from LoadDataWorker.
func (*LoadDataController) GetFieldCount ¶
func (e *LoadDataController) GetFieldCount() int
GetFieldCount get field count.
func (*LoadDataController) GetLoadDataReaderInfos ¶
func (e *LoadDataController) GetLoadDataReaderInfos() []LoadDataReaderInfo
GetLoadDataReaderInfos returns the LoadDataReaderInfo for each data file.
func (*LoadDataController) GetParser ¶
func (e *LoadDataController) GetParser( ctx context.Context, dataFileInfo LoadDataReaderInfo, ) (parser mydump.Parser, err error)
GetParser returns a parser for the data file.
func (*LoadDataController) HandleSkipNRows ¶
func (e *LoadDataController) HandleSkipNRows(parser mydump.Parser) error
HandleSkipNRows skips the first N rows of the data file.
func (*LoadDataController) InitDataFiles ¶
func (e *LoadDataController) InitDataFiles(ctx context.Context) error
InitDataFiles initializes the data store and files. it will call InitDataStore internally.
func (*LoadDataController) InitDataStore ¶
func (e *LoadDataController) InitDataStore(ctx context.Context) error
InitDataStore initializes the data store.
func (*LoadDataController) InitTiKVConfigs ¶
func (e *LoadDataController) InitTiKVConfigs(ctx context.Context, sctx sessionctx.Context) error
InitTiKVConfigs initializes some TiKV related configs.
func (*LoadDataController) PopulateChunks ¶
func (e *LoadDataController) PopulateChunks(ctx context.Context) (chunksMap map[int32][]Chunk, err error)
PopulateChunks populates chunks from table regions. in dist framework, this should be done in the tidb node which is responsible for splitting job into subtasks then table-importer handles data belongs to the subtask.
func (*LoadDataController) SetExecuteNodeCnt ¶
func (e *LoadDataController) SetExecuteNodeCnt(cnt int)
SetExecuteNodeCnt sets the execute node count.
type LoadDataReaderInfo ¶
type LoadDataReaderInfo struct {
// Opener can be called at needed to get a io.ReadSeekCloser. It will only
// be called once.
Opener func(ctx context.Context) (io.ReadSeekCloser, error)
// Remote is not nil only if load from cloud storage.
Remote *mydump.SourceFileMeta
}
LoadDataReaderInfo provides information for a data reader of LOAD DATA.
type Option ¶
type Option func(c *LoadDataController)
Option is used to set optional parameters for LoadDataController.
func WithLogger ¶
WithLogger sets the logger for LoadDataController.
type Plan ¶
type Plan struct {
DBName string
DBID int64
// TableInfo is the table info we used during import, we might change it
// if add index by SQL is enabled(it's disabled now).
TableInfo *model.TableInfo
// DesiredTableInfo is the table info before import, and the desired table info
// after import.
DesiredTableInfo *model.TableInfo
Path string
// only effective when data source is file.
Format string
// Data interpretation is restrictive if the SQL mode is restrictive and neither
// the IGNORE nor the LOCAL modifier is specified. Errors terminate the load
// operation.
// ref https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-column-assignments
Restrictive bool
// Location is used to convert time type for parquet, see
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp
Location *time.Location
SQLMode mysql.SQLMode
// Charset is the charset of the data file when file is CSV or TSV.
// it might be nil when using LOAD DATA and no charset is specified.
// for IMPORT INTO, it is always non-nil and default to be defaultCharacterSet.
Charset *string
ImportantSysVars map[string]string
// used for LOAD DATA and CSV format of IMPORT INTO
FieldNullDef []string
// this is not used in IMPORT INTO
NullValueOptEnclosed bool
// LinesStartingBy is not used in IMPORT INTO
// FieldsOptEnclosed is not used in either IMPORT INTO or LOAD DATA
plannercore.LineFieldsInfo
IgnoreLines uint64
DiskQuota config.ByteSize
Checksum config.PostOpLevel
ThreadCnt int
MaxNodeCnt int
MaxWriteSpeed config.ByteSize
SplitFile bool
MaxRecordedErrors int64
Detached bool
DisableTiKVImportMode bool
MaxEngineSize config.ByteSize
CloudStorageURI string
DisablePrecheck bool
GroupKey string
// used for checksum in physical mode
DistSQLScanConcurrency int
// todo: remove it when load data code is reverted.
InImportInto bool
DataSourceType DataSourceType
// only initialized for IMPORT INTO, used when creating job.
Parameters *ImportParameters `json:"-"`
// the user who executes the statement, in the form of user@host
// only initialized for IMPORT INTO
User string `json:"-"`
IsRaftKV2 bool
// total data file size in bytes.
TotalFileSize int64
// used in tests to force enable merge-step when using global sort.
ForceMergeStep bool
// see ManualRecovery in proto.ExtraParams
ManualRecovery bool
// the keyspace name when submitting this job, only for import-into
Keyspace string
// contains filtered or unexported fields
}
Plan describes the plan of LOAD DATA and IMPORT INTO.
func NewImportPlan ¶
func NewImportPlan(ctx context.Context, userSctx sessionctx.Context, plan *plannercore.ImportInto, tbl table.Table) (*Plan, error)
NewImportPlan creates a new import into plan.
func NewPlanFromLoadDataPlan ¶
func NewPlanFromLoadDataPlan(userSctx sessionctx.Context, plan *plannercore.LoadData) (*Plan, error)
NewPlanFromLoadDataPlan creates a import plan from LOAD DATA.
func (*Plan) IsGlobalSort ¶
IsGlobalSort returns true if we sort data on global storage.
func (*Plan) IsLocalSort ¶
IsLocalSort returns true if we sort data on local disk.
type QueryChunk ¶
QueryChunk is a chunk from query result.
type StepSummary ¶
type StepSummary struct {
Bytes int64 `json:"input-bytes,omitempty"`
RowCnt int64 `json:"input-rows,omitempty"`
}
StepSummary records the number of data involved in each step. The data stored might be inaccurate, such as the number of rows in encode step.
type Summary ¶
type Summary struct {
// EncodeSummary stores the bytes and rows needed to be processed in encode step.
// Same for other summaries.
EncodeSummary StepSummary `json:"encode-summary,omitempty"`
MergeSummary StepSummary `json:"merge-summary,omitempty"`
IngestSummary StepSummary `json:"ingest-summary,omitempty"`
// ImportedRows is the number of rows imported into TiKV.
// conflicted rows are excluded from this count if using global-sort.
ImportedRows int64 `json:"row-count,omitempty"`
ConflictRowCnt uint64 `json:"conflict-row-count,omitempty"`
// TooManyConflicts indicates there are too many conflicted rows that we
// cannot deduplicate during collecting its checksum, so we will skip later
// checksum step.
TooManyConflicts bool `json:"too-many-conflicts,omitempty"`
}
Summary records the amount of data needed to be processed in each step of the import job. And this information will be saved into tidb_import_jobs table after the job is finished.
type TableImporter ¶
type TableImporter struct {
*LoadDataController
// contains filtered or unexported fields
}
TableImporter is a table importer.
func NewTableImporter ¶
func NewTableImporter( ctx context.Context, e *LoadDataController, id string, kvStore tidbkv.Storage, ) (ti *TableImporter, err error)
NewTableImporter creates a new table importer.
func NewTableImporterForTest ¶
func NewTableImporterForTest(ctx context.Context, e *LoadDataController, id string, kvStore tidbkv.Storage) (*TableImporter, error)
NewTableImporterForTest creates a new table importer for test.
func (*TableImporter) Allocators ¶
func (ti *TableImporter) Allocators() autoid.Allocators
Allocators returns allocators used to record max used ID, i.e. PanickingAllocators.
func (*TableImporter) Backend ¶
func (ti *TableImporter) Backend() *local.Backend
Backend returns the backend of the importer.
func (*TableImporter) CheckDiskQuota ¶
func (ti *TableImporter) CheckDiskQuota(ctx context.Context)
CheckDiskQuota checks disk quota.
func (*TableImporter) Close ¶
func (ti *TableImporter) Close() error
Close implements the io.Closer interface.
func (*TableImporter) GetKVEncoderForDupResolve ¶
func (ti *TableImporter) GetKVEncoderForDupResolve() (*TableKVEncoder, error)
GetKVEncoderForDupResolve get the KV encoder for duplicate resolution.
func (*TableImporter) GetKVStore ¶
func (ti *TableImporter) GetKVStore() tidbkv.Storage
GetKVStore gets the kv store.
func (*TableImporter) GetKeySpace ¶
func (ti *TableImporter) GetKeySpace() []byte
GetKeySpace gets the keyspace of the kv store.
func (*TableImporter) ImportAndCleanup ¶
func (ti *TableImporter) ImportAndCleanup(ctx context.Context, closedEngine *backend.ClosedEngine) (int64, error)
ImportAndCleanup imports the engine and cleanup the engine data.
func (*TableImporter) ImportSelectedRows ¶
func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.Context) (int64, error)
ImportSelectedRows imports selected rows.
func (*TableImporter) OpenDataEngine ¶
func (ti *TableImporter) OpenDataEngine(ctx context.Context, engineID int32) (*backend.OpenedEngine, error)
OpenDataEngine opens a data engine.
func (*TableImporter) OpenIndexEngine ¶
func (ti *TableImporter) OpenIndexEngine(ctx context.Context, engineID int32) (*backend.OpenedEngine, error)
OpenIndexEngine opens an index engine.
func (*TableImporter) SetSelectedChunkCh ¶
func (ti *TableImporter) SetSelectedChunkCh(ch chan QueryChunk)
SetSelectedChunkCh sets the channel to receive selected rows.
type TableKVEncoder ¶
type TableKVEncoder struct {
*kv.BaseKVEncoder
// contains filtered or unexported fields
}
TableKVEncoder encodes a row of data into a KV pair.
func NewTableKVEncoder ¶
func NewTableKVEncoder( config *encode.EncodingConfig, ctrl *LoadDataController, ) (*TableKVEncoder, error)
NewTableKVEncoder creates a new TableKVEncoder. exported for test.
func NewTableKVEncoderForDupResolve ¶
func NewTableKVEncoderForDupResolve( config *encode.EncodingConfig, ctrl *LoadDataController, ) (*TableKVEncoder, error)
NewTableKVEncoderForDupResolve creates a new TableKVEncoder for duplicate resolution.