Documentation
¶
Index ¶
- Constants
- Variables
- func BytesMax(a, b []byte) []byte
- func BytesMin(a, b []byte) []byte
- func CalRangeSize(memPerCore int64, regionSplitSize, regionSplitKeys int64) (int64, int64)
- func CleanUpFiles(ctx context.Context, store storage.ExternalStorage, nonPartitionedDir string) error
- func DivideMergeSortDataFiles(dataFiles []string, nodeCnt int, mergeConc int) ([][]string, error)
- func GetAdjustedBlockSize(totalBufSize uint64, defBlockSize int) int
- func GetAdjustedMergeSortFileCountStep(concurrency int) int
- func GetAdjustedMergeSortOverlapThreshold(concurrency int) int64
- func GetAllFileNames(ctx context.Context, store storage.ExternalStorage, nonPartitionedDir string) ([]string, error)
- func GetMaxOverlapping(points []Endpoint) int64
- func GetMaxOverlappingTotal(stats []MultipleFilesStat) int64
- func IndexID2KVGroup(indexID int64) string
- func KVGroup2IndexID(kvGroup string) (int64, error)
- func MergeOverlappingFiles(ctx *workerpool.Context, paths []string, concurrency int, op *MergeOperator) error
- func MergeOverlappingFilesV2(ctx context.Context, multiFileStat []MultipleFilesStat, ...) (err error)
- func MockExternalEngine(storage storage.ExternalStorage, keys [][]byte, values [][]byte) (dataFiles []string, statsFiles []string, err error)
- func NewMergeCollector(ctx context.Context, summary *execute.SubtaskSummary) *mergeCollector
- func PlanMetaPath(taskID int64, step string, idx int) string
- func ReadKVFilesAsync(ctx context.Context, eg *util.ErrorGroupWithRecover, ...) chan *KVPair
- func SubtaskMetaPath(taskID int64, subtaskID int64) string
- type BaseExternalMeta
- type Endpoint
- type EndpointTp
- type Engine
- func (e *Engine) Close() error
- func (e *Engine) ConflictInfo() engineapi.ConflictInfo
- func (e *Engine) GetKeyRange() (startKey []byte, endKey []byte, err error)
- func (e *Engine) GetOnDup() engineapi.OnDuplicateKey
- func (e *Engine) GetRegionSplitKeys() ([][]byte, error)
- func (e *Engine) GetTotalLoadedKVsCount() int64
- func (e *Engine) ID() string
- func (e *Engine) ImportedStatistics() (importedSize int64, importedKVCount int64)
- func (e *Engine) KVStatistics() (totalKVSize int64, totalKVCount int64)
- func (e *Engine) LoadIngestData(ctx context.Context, outCh chan<- engineapi.DataAndRanges) (err error)
- func (e *Engine) Reset()
- func (e *Engine) SetWorkerPool(worker workerpool.Tuner)
- func (e *Engine) UpdateResource(ctx context.Context, concurrency int, memCapacity int64) error
- type EngineWriter
- type KVPair
- type KVReader
- type KeyValueStore
- type MemoryIngestData
- func (m *MemoryIngestData) DecRef()
- func (m *MemoryIngestData) Finish(totalBytes, totalCount int64)
- func (m *MemoryIngestData) GetFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error)
- func (m *MemoryIngestData) GetTS() uint64
- func (m *MemoryIngestData) IncRef()
- func (m *MemoryIngestData) NewIter(ctx context.Context, lowerBound, upperBound []byte, bufPool *membuf.Pool) engineapi.ForwardIter
- type MergeKVIter
- type MergeOperator
- type MergePropIter
- type MultipleFilesStat
- type OnWriterCloseFunc
- type OneFileWriter
- type RangeSplitter
- type SortedKVMeta
- type Writer
- type WriterBuilder
- func (b *WriterBuilder) Build(store storage.ExternalStorage, prefix string, writerID string) *Writer
- func (b *WriterBuilder) BuildOneFile(store storage.ExternalStorage, prefix string, writerID string) *OneFileWriter
- func (b *WriterBuilder) SetBlockSize(blockSize int) *WriterBuilder
- func (b *WriterBuilder) SetGroupOffset(offset int) *WriterBuilder
- func (b *WriterBuilder) SetMemorySizeLimit(size uint64) *WriterBuilder
- func (b *WriterBuilder) SetOnCloseFunc(onClose OnWriterCloseFunc) *WriterBuilder
- func (b *WriterBuilder) SetOnDup(onDup engineapi.OnDuplicateKey) *WriterBuilder
- func (b *WriterBuilder) SetPropKeysDistance(dist uint64) *WriterBuilder
- func (b *WriterBuilder) SetPropSizeDistance(dist uint64) *WriterBuilder
- func (b *WriterBuilder) SetTiKVCodec(codec tikv.Codec) *WriterBuilder
- type WriterSummary
Constants ¶
const ( // DefaultMemSizeLimit is the default memory size limit for writer. DefaultMemSizeLimit = 256 * size.MB // DefaultBlockSize is the default block size for writer. DefaultBlockSize = 16 * units.MiB )
const ( // DataKVGroup is the group name of the sorted kv for data. // index kv will be stored in a group named as index-id. DataKVGroup = "data" )
const ( // MaxUploadPartCount defines the divisor used when calculating the size of each uploaded part. // Setting it from 10000 to 5000 increases the part size so that the total number of parts stays well below // the S3 multipart upload limit of 10,000 parts, to avoiding the error "TotalPartsExceeded: exceeded total allowed configured MaxUploadParts (10000)". MaxUploadPartCount = 5000 )
Variables ¶
var ( // MaxMergingFilesPerThread is the maximum number of files that can be merged by a // single thread. This value comes from the fact that 16 threads are ok to merge 4k // files in parallel, so we set it to 250. MaxMergingFilesPerThread = 250 // MinUploadPartSize is the minimum size of each part when uploading files to // external storage, which is 5MiB for both S3 and GCS. MinUploadPartSize int64 = 5 * units.MiB )
var ( // MaxMergeSortFileCountStep is the maximum step of file count when we split the sorted kv files. Note: Use GetAdjustedMergeSortFileCountStep() instead. MaxMergeSortFileCountStep = 4000 // MergeSortMaxSubtaskTargetFiles assumes each merge sort subtask generates 16 files. MergeSortMaxSubtaskTargetFiles = 16 )
var ( // ConcurrentReaderBufferSizePerConc is the buffer size for concurrent reader per // concurrency. ConcurrentReaderBufferSizePerConc = int(8 * size.MB) )
var ( // DefaultOneWriterBlockSize is the default block size for one writer. // TODO currently we don't have per-writer mem size limit, we always use the // default mem size limit as the block size. // it's ok for now, we can make it configurable in the future. DefaultOneWriterBlockSize = int(defaultOneWriterMemSizeLimit) )
var ( // DefaultReadBufferSize default read buf size of KVReader, this buf is split // into 3 parts, 2 for prefetch from storage, 1 for read by user. DefaultReadBufferSize = 64 * units.KiB )
Functions ¶
func CalRangeSize ¶
CalRangeSize calculates the range size and range keys. see writeStepMemShareCount for more info.
func CleanUpFiles ¶
func CleanUpFiles(ctx context.Context, store storage.ExternalStorage, nonPartitionedDir string) error
CleanUpFiles delete all data and stat files under the same non-partitioned dir. see randPartitionedPrefix for how we partition the files.
func DivideMergeSortDataFiles ¶
DivideMergeSortDataFiles divides the data files into multiple groups for merge sort. Each group will be assigned to a node for sorting. The number of files in each group is limited to MaxMergeSortFileCountStep.
func GetAdjustedBlockSize ¶
GetAdjustedBlockSize gets the block size after alignment.
func GetAdjustedMergeSortFileCountStep ¶
GetAdjustedMergeSortFileCountStep adjusts the merge sort file count step based on concurrency.
func GetAdjustedMergeSortOverlapThreshold ¶
GetAdjustedMergeSortOverlapThreshold adjusts the merge sort overlap threshold based on concurrency. The bigger the threshold, the bigger the statistical bias. In CPU:Memory = 1:2 machine, if the concurrency is less than 8, the memory can be used to load data is small, and may get blocked by the memory limiter. So we lower the threshold here if concurrency too low.
func GetAllFileNames ¶
func GetAllFileNames( ctx context.Context, store storage.ExternalStorage, nonPartitionedDir string, ) ([]string, error)
GetAllFileNames returns files with the same non-partitioned dir.
- for intermediate KV/stat files we store them with a partitioned way to mitigate limitation on Cloud, see randPartitionedPrefix for how we partition the files.
- for meta files, we store them directly under the non-partitioned dir.
for example, if nonPartitionedDir is '30001', the files returned might be
- 30001/6/meta.json
- 30001/7/meta.json
- 30001/plan/ingest/1/meta.json
- 30001/plan/merge-sort/1/meta.json
- p00110000/30001/7/617527bf-e25d-4312-8784-4a4576eb0195_stat/one-file
- p00000000/30001/7/617527bf-e25d-4312-8784-4a4576eb0195/one-file
func GetMaxOverlapping ¶
GetMaxOverlapping returns the maximum overlapping weight treating given `points` as endpoints of intervals. `points` are not required to be sorted, and will be sorted in-place in this function.
func GetMaxOverlappingTotal ¶
func GetMaxOverlappingTotal(stats []MultipleFilesStat) int64
GetMaxOverlappingTotal assume the most overlapping case from given stats and returns the overlapping level.
func IndexID2KVGroup ¶
IndexID2KVGroup converts index id to kv group name. exported for test.
func KVGroup2IndexID ¶
KVGroup2IndexID converts index kv group name to index id.
func MergeOverlappingFiles ¶
func MergeOverlappingFiles( ctx *workerpool.Context, paths []string, concurrency int, op *MergeOperator, ) error
MergeOverlappingFiles reads from given files whose key range may overlap and writes to new sorted, nonoverlapping files.
func MergeOverlappingFilesV2 ¶
func MergeOverlappingFilesV2( ctx context.Context, multiFileStat []MultipleFilesStat, store storage.ExternalStorage, startKey []byte, endKey []byte, partSize int64, newFilePrefix string, writerID string, blockSize int, writeBatchCount uint64, propSizeDist uint64, propKeysDist uint64, onWriterClose OnWriterCloseFunc, concurrency int, checkHotspot bool, ) (err error)
MergeOverlappingFilesV2 reads from given files whose key range may overlap and writes to new sorted, nonoverlapping files. Using 1 readAllData and 1 writer.
func MockExternalEngine ¶
func MockExternalEngine( storage storage.ExternalStorage, keys [][]byte, values [][]byte, ) (dataFiles []string, statsFiles []string, err error)
MockExternalEngine generates an external engine with the given keys and values.
func NewMergeCollector ¶
func NewMergeCollector(ctx context.Context, summary *execute.SubtaskSummary) *mergeCollector
NewMergeCollector creates a new merge collector.
func PlanMetaPath ¶
PlanMetaPath returns the path of the plan meta file.
func ReadKVFilesAsync ¶
func ReadKVFilesAsync(ctx context.Context, eg *util.ErrorGroupWithRecover, store storage.ExternalStorage, files []string) chan *KVPair
ReadKVFilesAsync reads multiple KV files asynchronously and sends the KV pairs to the returned channel, the channel will be closed when finish read.
func SubtaskMetaPath ¶
SubtaskMetaPath returns the path of the subtask meta file.
Types ¶
type BaseExternalMeta ¶
type BaseExternalMeta struct {
// ExternalPath is the path to the external storage where the external meta is stored.
ExternalPath string
}
BaseExternalMeta is the base meta of external meta.
func (BaseExternalMeta) Marshal ¶
func (m BaseExternalMeta) Marshal(alias any) ([]byte, error)
Marshal serializes the provided alias to JSON. Usage: If ExternalPath is set, marshals using internal meta; otherwise marshals the alias directly.
func (BaseExternalMeta) ReadJSONFromExternalStorage ¶
func (m BaseExternalMeta) ReadJSONFromExternalStorage(ctx context.Context, store storage.ExternalStorage, a any) error
ReadJSONFromExternalStorage reads and unmarshals JSON from external storage into the provided alias. Usage: Retrieve external meta for further processing.
func (BaseExternalMeta) WriteJSONToExternalStorage ¶
func (m BaseExternalMeta) WriteJSONToExternalStorage(ctx context.Context, store storage.ExternalStorage, a any) error
WriteJSONToExternalStorage writes the serialized external meta JSON to external storage. Usage: Store external meta after appropriate modifications.
type Endpoint ¶
type Endpoint struct {
Key []byte
Tp EndpointTp
Weight int64 // all EndpointTp use positive weight
}
Endpoint represents an endpoint of an interval which can be used by GetMaxOverlapping.
type EndpointTp ¶
type EndpointTp int
EndpointTp is the type of Endpoint.Key.
const ( // ExclusiveEnd represents "..., Endpoint.Key)". ExclusiveEnd EndpointTp = iota // InclusiveStart represents "[Endpoint.Key, ...". InclusiveStart // InclusiveEnd represents "..., Endpoint.Key]". InclusiveEnd )
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine stored sorted key/value pairs in an external storage.
func NewExternalEngine ¶
func NewExternalEngine( ctx context.Context, storage storage.ExternalStorage, dataFiles []string, statsFiles []string, startKey []byte, endKey []byte, jobKeys [][]byte, splitKeys [][]byte, workerConcurrency int, ts uint64, totalKVSize int64, totalKVCount int64, checkHotspot bool, memCapacity int64, onDup engineapi.OnDuplicateKey, filePrefix string, ) *Engine
NewExternalEngine creates an (external) engine.
func (*Engine) ConflictInfo ¶
func (e *Engine) ConflictInfo() engineapi.ConflictInfo
ConflictInfo implements common.Engine.
func (*Engine) GetKeyRange ¶
GetKeyRange implements common.Engine.
func (*Engine) GetOnDup ¶
func (e *Engine) GetOnDup() engineapi.OnDuplicateKey
GetOnDup returns the OnDuplicateKey action for this engine.
func (*Engine) GetRegionSplitKeys ¶
GetRegionSplitKeys implements common.Engine.
func (*Engine) GetTotalLoadedKVsCount ¶
GetTotalLoadedKVsCount returns the total number of KVs loaded in LoadIngestData.
func (*Engine) ImportedStatistics ¶
ImportedStatistics returns the imported kv size and imported kv count.
func (*Engine) KVStatistics ¶
KVStatistics returns the total kv size and total kv count.
func (*Engine) LoadIngestData ¶
func (e *Engine) LoadIngestData( ctx context.Context, outCh chan<- engineapi.DataAndRanges, ) (err error)
LoadIngestData loads the data from the external storage to memory in [start, end) range, so local backend can ingest it. The used byte slice of ingest data are allocated from Engine.bufPool and must be released by MemoryIngestData.DecRef().
func (*Engine) SetWorkerPool ¶
func (e *Engine) SetWorkerPool(worker workerpool.Tuner)
SetWorkerPool sets the worker pool for this engine.
type EngineWriter ¶
type EngineWriter struct {
// contains filtered or unexported fields
}
EngineWriter implements backend.EngineWriter interface.
func NewEngineWriter ¶
func NewEngineWriter(w *Writer) *EngineWriter
NewEngineWriter creates a new EngineWriter.
func (*EngineWriter) AppendRows ¶
AppendRows implements backend.EngineWriter interface.
func (*EngineWriter) Close ¶
func (e *EngineWriter) Close(ctx context.Context) (common.ChunkFlushStatus, error)
Close implements backend.EngineWriter interface.
func (*EngineWriter) IsSynced ¶
func (e *EngineWriter) IsSynced() bool
IsSynced implements backend.EngineWriter interface.
type KVReader ¶
type KVReader struct {
// contains filtered or unexported fields
}
KVReader reads a file in sorted KV format. the format is as follows:
- <kv-pair-block><kv-pair-block>....
- each <kv-pair-block> is <key-len><value-len><key><value>
- <key-len> and <value-len> are uint64 in big-endian
type KeyValueStore ¶
type KeyValueStore struct {
// contains filtered or unexported fields
}
KeyValueStore stores key-value pairs and maintains the range properties.
func NewKeyValueStore ¶
func NewKeyValueStore( ctx context.Context, dataWriter storage.ExternalFileWriter, rangePropertiesCollector *rangePropertiesCollector, ) *KeyValueStore
NewKeyValueStore creates a new KeyValueStore. The data will be written to the given dataWriter and range properties will be maintained in the given rangePropertiesCollector.
type MemoryIngestData ¶
type MemoryIngestData struct {
// contains filtered or unexported fields
}
MemoryIngestData is the in-memory implementation of IngestData.
func (*MemoryIngestData) DecRef ¶
func (m *MemoryIngestData) DecRef()
DecRef implements IngestData.DecRef.
func (*MemoryIngestData) Finish ¶
func (m *MemoryIngestData) Finish(totalBytes, totalCount int64)
Finish implements IngestData.Finish.
func (*MemoryIngestData) GetFirstAndLastKey ¶
func (m *MemoryIngestData) GetFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error)
GetFirstAndLastKey implements IngestData.GetFirstAndLastKey.
func (*MemoryIngestData) GetTS ¶
func (m *MemoryIngestData) GetTS() uint64
GetTS implements IngestData.GetTS.
func (*MemoryIngestData) IncRef ¶
func (m *MemoryIngestData) IncRef()
IncRef implements IngestData.IncRef.
func (*MemoryIngestData) NewIter ¶
func (m *MemoryIngestData) NewIter( ctx context.Context, lowerBound, upperBound []byte, bufPool *membuf.Pool, ) engineapi.ForwardIter
NewIter implements IngestData.NewIter.
type MergeKVIter ¶
type MergeKVIter struct {
// contains filtered or unexported fields
}
MergeKVIter is an iterator that merges multiple sorted KV pairs from different files.
func NewMergeKVIter ¶
func NewMergeKVIter( ctx context.Context, paths []string, pathsStartOffset []uint64, exStorage storage.ExternalStorage, readBufferSize int, checkHotspot bool, outerConcurrency int, ) (*MergeKVIter, error)
NewMergeKVIter creates a new MergeKVIter. The KV can be accessed by calling Next() then Key() or Values(). readBufferSize is the buffer size for each file reader, which means the total memory usage is readBufferSize * len(paths).
func (*MergeKVIter) Error ¶
func (i *MergeKVIter) Error() error
Error returns the error of the iterator.
func (*MergeKVIter) Next ¶
func (i *MergeKVIter) Next() bool
Next moves the iterator to the next position. When it returns false, the iterator is not usable.
type MergeOperator ¶
type MergeOperator struct {
*operator.AsyncOperator[*mergeMinimalTask, workerpool.None]
}
MergeOperator is the operator that merges overlapping files.
func NewMergeOperator ¶
func NewMergeOperator( ctx *workerpool.Context, store storage.ExternalStorage, partSize int64, newFilePrefix string, blockSize int, onWriterClose OnWriterCloseFunc, collector execute.Collector, concurrency int, checkHotspot bool, onDup engineapi.OnDuplicateKey, ) *MergeOperator
NewMergeOperator creates a new MergeOperator instance.
func (*MergeOperator) String ¶
func (*MergeOperator) String() string
String implements the Operator interface.
type MergePropIter ¶
type MergePropIter struct {
// contains filtered or unexported fields
}
MergePropIter is an iterator that merges multiple range properties from different files.
func NewMergePropIter ¶
func NewMergePropIter( ctx context.Context, multiStat []MultipleFilesStat, exStorage storage.ExternalStorage, ) (*MergePropIter, error)
NewMergePropIter creates a new MergePropIter.
Input MultipleFilesStat should be processed by functions like MergeOverlappingFiles to reduce overlapping to less than maxMergeSortOverlapThreshold. MergePropIter will only open needed MultipleFilesStat and its Filenames when iterates, and input MultipleFilesStat must guarantee its order and its Filename order can be process from left to right.
func (*MergePropIter) Error ¶
func (i *MergePropIter) Error() error
Error returns the error of the iterator.
func (*MergePropIter) Next ¶
func (i *MergePropIter) Next() bool
Next moves the iterator to the next position.
type MultipleFilesStat ¶
type MultipleFilesStat struct {
MinKey tidbkv.Key `json:"min-key"`
MaxKey tidbkv.Key `json:"max-key"`
// Filenames is a list of [dataFile, statFile] paris, and it's sorted by the
// first key of the data file.
Filenames [][2]string `json:"filenames"`
MaxOverlappingNum int64 `json:"max-overlapping-num"`
}
MultipleFilesStat is the statistic information of multiple files (currently every 500 files). It is used to estimate the data overlapping, and per-file statistic information maybe too big to loaded into memory.
type OnWriterCloseFunc ¶
type OnWriterCloseFunc func(summary *WriterSummary)
OnWriterCloseFunc is the callback function when a writer is closed.
type OneFileWriter ¶
type OneFileWriter struct {
// contains filtered or unexported fields
}
OneFileWriter is used to write data into external storage with only one file for data and stat.
func (*OneFileWriter) Close ¶
func (w *OneFileWriter) Close(ctx context.Context) error
Close closes the writer.
func (*OneFileWriter) InitPartSizeAndLogger ¶
func (w *OneFileWriter) InitPartSizeAndLogger(ctx context.Context, partSize int64)
InitPartSizeAndLogger inits the OneFileWriter and its underlying KeyValueStore.
type RangeSplitter ¶
type RangeSplitter struct {
// contains filtered or unexported fields
}
RangeSplitter is used to split key ranges of an external engine. Please see NewRangeSplitter and SplitOneRangesGroup for more details.
func NewRangeSplitter ¶
func NewRangeSplitter( ctx context.Context, multiFileStat []MultipleFilesStat, externalStorage storage.ExternalStorage, rangesGroupSize, rangesGroupKeyCnt int64, rangeJobSize, rangeJobKeyCnt int64, regionSplitSize, regionSplitKeyCnt int64, ) (*RangeSplitter, error)
NewRangeSplitter creates a new RangeSplitter to process the stat files of `multiFileStat` stored in `externalStorage`.
`rangesGroupSize` and `rangesGroupKeyCnt` controls the total size and key count limit of the ranges group returned by one `SplitOneRangesGroup` invocation. The ranges group may contain multiple range jobs and region split keys. The size and keys limit of one range job are controlled by `rangeJobSize` and `rangeJobKeyCnt`. The size and keys limit of intervals of region split keys are controlled by `regionSplitSize` and `regionSplitKeyCnt`.
func (*RangeSplitter) Close ¶
func (r *RangeSplitter) Close() error
Close release the resources of RangeSplitter.
func (*RangeSplitter) SplitOneRangesGroup ¶
func (r *RangeSplitter) SplitOneRangesGroup() ( endKeyOfGroup []byte, dataFiles []string, statFiles []string, interiorRangeJobKeys [][]byte, interiorRegionSplitKeys [][]byte, err error, )
SplitOneRangesGroup splits one ranges group may contain multiple range jobs and region split keys. `endKeyOfGroup` represents the end key of the group, but it will be nil when the group is the last one. `dataFiles` and `statFiles` are all the files that have overlapping key ranges in this group. `interiorRangeJobKeys` are the interior boundary keys of the range jobs, the range can be constructed with start/end key at caller. `interiorRegionSplitKeys` are the split keys that will be used later to split regions.
type SortedKVMeta ¶
type SortedKVMeta struct {
StartKey []byte `json:"start-key"`
EndKey []byte `json:"end-key"` // exclusive
TotalKVSize uint64 `json:"total-kv-size"`
TotalKVCnt uint64 `json:"total-kv-cnt"`
MultipleFilesStats []MultipleFilesStat `json:"multiple-files-stats"`
ConflictInfo engineapi.ConflictInfo `json:"conflict-info"`
}
SortedKVMeta is the meta of sorted kv.
func NewSortedKVMeta ¶
func NewSortedKVMeta(summary *WriterSummary) *SortedKVMeta
NewSortedKVMeta creates a SortedKVMeta from a WriterSummary. If the summary is empty, it will return a pointer to zero SortedKVMeta.
func (*SortedKVMeta) GetDataFiles ¶
func (m *SortedKVMeta) GetDataFiles() []string
GetDataFiles returns all data files in the meta.
func (*SortedKVMeta) GetStatFiles ¶
func (m *SortedKVMeta) GetStatFiles() []string
GetStatFiles returns all stat files in the meta.
func (*SortedKVMeta) Merge ¶
func (m *SortedKVMeta) Merge(other *SortedKVMeta)
Merge merges the other SortedKVMeta into this one.
func (*SortedKVMeta) MergeSummary ¶
func (m *SortedKVMeta) MergeSummary(summary *WriterSummary)
MergeSummary merges the WriterSummary into this SortedKVMeta.
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer is used to write data into external storage.
func (*Writer) LockForWrite ¶
func (w *Writer) LockForWrite() func()
LockForWrite implements ingest.Writer. Since flushKVs is thread-safe in external storage writer, this is implemented as noop.
func (*Writer) WrittenBytes ¶
WrittenBytes returns the number of bytes written by this writer.
type WriterBuilder ¶
type WriterBuilder struct {
// contains filtered or unexported fields
}
WriterBuilder builds a new Writer.
func NewWriterBuilder ¶
func NewWriterBuilder() *WriterBuilder
NewWriterBuilder creates a WriterBuilder.
func (*WriterBuilder) Build ¶
func (b *WriterBuilder) Build( store storage.ExternalStorage, prefix string, writerID string, ) *Writer
Build builds a new Writer. The files writer will create are under the prefix of "{prefix}/{writerID}".
func (*WriterBuilder) BuildOneFile ¶
func (b *WriterBuilder) BuildOneFile( store storage.ExternalStorage, prefix string, writerID string, ) *OneFileWriter
BuildOneFile builds a new one file Writer. The writer will create only one file under the prefix of "{prefix}/{writerID}".
func (*WriterBuilder) SetBlockSize ¶
func (b *WriterBuilder) SetBlockSize(blockSize int) *WriterBuilder
SetBlockSize sets the block size of pre-allocated buf in the writer.
func (*WriterBuilder) SetGroupOffset ¶
func (b *WriterBuilder) SetGroupOffset(offset int) *WriterBuilder
SetGroupOffset set the group offset of a writer. This can be used to group the summaries from different writers. For example, for adding multiple indexes with multi-schema-change, we use to distinguish the summaries from different indexes.
func (*WriterBuilder) SetMemorySizeLimit ¶
func (b *WriterBuilder) SetMemorySizeLimit(size uint64) *WriterBuilder
SetMemorySizeLimit sets the memory size limit of the writer. When accumulated data size exceeds this limit, the writer will flush data as a file to external storage. When the writer is OneFileWriter SetMemorySizeLimit sets the preAllocated memory buffer size.
func (*WriterBuilder) SetOnCloseFunc ¶
func (b *WriterBuilder) SetOnCloseFunc(onClose OnWriterCloseFunc) *WriterBuilder
SetOnCloseFunc sets the callback function when a writer is closed.
func (*WriterBuilder) SetOnDup ¶
func (b *WriterBuilder) SetOnDup(onDup engineapi.OnDuplicateKey) *WriterBuilder
SetOnDup sets the action when checkDup enabled and a duplicate key is found.
func (*WriterBuilder) SetPropKeysDistance ¶
func (b *WriterBuilder) SetPropKeysDistance(dist uint64) *WriterBuilder
SetPropKeysDistance sets the distance of range keys for each property.
func (*WriterBuilder) SetPropSizeDistance ¶
func (b *WriterBuilder) SetPropSizeDistance(dist uint64) *WriterBuilder
SetPropSizeDistance sets the distance of range size for each property.
func (*WriterBuilder) SetTiKVCodec ¶
func (b *WriterBuilder) SetTiKVCodec(codec tikv.Codec) *WriterBuilder
SetTiKVCodec sets the tikv codec of the writer.
type WriterSummary ¶
type WriterSummary struct {
WriterID string
GroupOffset int
Seq int
// Min and Max are the min and max key written by this writer, both are
// inclusive, i.e. [Min, Max].
// will be empty if no key is written.
Min tidbkv.Key
Max tidbkv.Key
// TotalSize is the total size of the KV written by this writer.
// depends on onDup setting, duplicates might not be included.
TotalSize uint64
// TotalCnt is the total count of the KV written by this writer.
// depends on onDup setting, duplicates might not be included.
TotalCnt uint64
// KVFileCount is the total count of the KV files written by this writer.
KVFileCount int
MultipleFilesStats []MultipleFilesStat
ConflictInfo engineapi.ConflictInfo
}
WriterSummary is the summary of a writer.