Documentation
¶
Overview ¶
Copyright 2024 Huawei Cloud Computing Technologies Co., Ltd.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2023 Huawei Cloud Computing Technologies Co., Ltd.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2023 Huawei Cloud Computing Technologies Co., Ltd.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright 2023 Huawei Cloud Computing Technologies Co., Ltd.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- Variables
- func EvictGroupLruCache(key BlockCacheKey, value *BlockCacheValue)
- func EvictPieceLruCache(key BlockCacheKey, value *BlockCacheValue)
- func FlushVerticalFilter(filterVerBuffer, filterLogBuffer []byte) []byte
- func GenSchemaIdxs(schema record.Schemas, skipIndexRelation *influxql.IndexRelation, ...) []int
- func GetAdaptiveTimeBucket(startTime, endTime time.Time, ascending bool) time.Duration
- func GetAggLogIntervalList() []int
- func GetBinaryMask(segCnt uint32) (uint32, uint32)
- func GetSkipIndexBuf(cols int) *multiCosBuf
- func GetVlmCacheInitialized() bool
- func HotDataDetectorTaskLen() int
- func InitSkipIndexPool(size int)
- func InitializeVlmCache()
- func IsFullTextIdx(indexRelation *influxql.IndexRelation) bool
- func NewFileReader(reader fileops.BasicFileReader) *fileReader
- func PutSkipIndexBuf(buf *multiCosBuf)
- func SendLogRequest(readRequest *LogReadRequest)
- func SendLogRequestWithHash(object *LogPath, queryHashes map[string][]uint64)
- func SetVlmCacheInitialized(cacheEnabled bool)
- func StartHotDataDetector()
- func StopHotDataDetector()
- type BlockCache
- type BlockCacheKey
- type BlockCacheValue
- type BlockFileContainer
- func (c *BlockFileContainer) Fetch(bankId int32, blockId int32) ([]byte, bool)
- func (c *BlockFileContainer) FetchWith(bankId int32, blockId int32, offset int64, length int64) ([]byte, bool)
- func (c *BlockFileContainer) MultiFetch(banks []int32, blocks []int32, totalLength int32) ([]byte, bool)
- func (c *BlockFileContainer) MultiRemove(banks []int32, blocks []int32)
- func (c *BlockFileContainer) MultiStore(data []byte) ([]int32, []int32, bool)
- func (c *BlockFileContainer) Remove(bankId int32, blockId int32)
- func (c *BlockFileContainer) Store(data []byte) (int32, int32, bool)
- type BlockLocks
- type BlockLruCache
- func (c *BlockLruCache) Fetch(key BlockCacheKey) ([]byte, bool)
- func (c *BlockLruCache) FetchWith(key BlockCacheKey, version uint32, size int64) ([]byte, bool)
- func (c *BlockLruCache) GetBlockFileContainer(version uint32) *BlockFileContainer
- func (c *BlockLruCache) Remove(key BlockCacheKey) bool
- func (c *BlockLruCache) SetBlockFileContainer(version uint32, fileContainer *BlockFileContainer)
- func (c *BlockLruCache) Store(key BlockCacheKey, version uint32, diskData []byte)
- type BloomFilterReader
- type Constant
- type HitHashes
- type HotDataDetector
- type HotDataValue
- type HotDataValues
- type Iterator
- type LogPath
- type LogReadRequest
- type LruCache
- type LruCacheKey
- type LruCacheType
- type PrefetchType
- type RingBuf
- type VlmCacheReader
Constants ¶
const ( PieceLruSegNumber uint32 = 16 GroupLruSegNumber uint32 = 4 BankFileNumber int32 = 8 )
const ( GramTokenizerVersion = 3 CurrentLogTokenizerVersion = 5 )
const ( Year int = iota Month Day Hour Minute Second )
const ( SecCountPerMinute int = 1 * 60 SecCountPerHour = 1 * 60 * SecCountPerMinute SecCountPerDay = 1 * 24 * SecCountPerHour SecCountPerMonth = 1 * 30 * SecCountPerDay SecCountPerYear = 1 * 12 * SecCountPerMonth )
const ( OBSMetaFileName = "segment.meta" OBSVLMFileName = "segment.vlm" OBSContentFileName = "segment.cnt" FilterLogName = "segment.fll" )
const ( MaxReadRequestChanSize = 512 MaxTimeBuf = 256 TimeInverval = 30 * time.Minute FrequencyThreshold = 10 )
const (
Prime_64 uint64 = 0x9E3779B185EBCA87
)
Variables ¶
var ( HotPieceLruCache *BlockLruCache = nil HotGroupLruCache *BlockLruCache = nil BlockLruCacheInitialized = false PieceCacheEvictNum uint64 = 0 PieceCacheHitNum uint64 = 0 PieceCacheHitMiss uint64 = 0 VlmCacheReaderBeforeCache uint64 = 0 VlmCacheReaderAfterCache uint64 = 0 VlmCacheReaderBeforRead uint64 = 0 VlmCacheReaderAfterRead uint64 = 0 VlmCacheReaderAfterStore uint64 = 0 PieceCacheBeforeFetch uint64 = 0 PieceCacheAfterFetch uint64 = 0 PieceContainerBeforeFetch uint64 = 0 PieceContainerAfterFetch uint64 = 0 PieceCacheStoreBeforeFetch uint64 = 0 PieceCacheStoreAfterFetch uint64 = 0 PieceCacheBeforeStore uint64 = 0 PieceCacheAfterStore uint64 = 0 PieceContainerBeforeStore uint64 = 0 PieceContainerAfterStore uint64 = 0 )
var AggLogBucketCountList = []int{
60, 24, 36, 48, 60, 36, 42, 48, 54, 60,
22, 24, 26, 28, 30, 32, 34, 36, 38, 40,
42, 44, 46, 48, 50, 52, 54, 56, 58, 60,
31, 32, 33, 34, 35, 36, 37, 38, 39, 40,
41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
51, 52, 53, 54, 55, 56, 57, 58, 59, 60,
}
var AggLogIntervalList = GetAggLogIntervalList()
var (
PrefetchTimerInterval = 5 * time.Second
)
var SkipIndexPool pool.FixedPool
var (
Table = crc32.MakeTable(crc32.Castagnoli)
)
Functions ¶
func EvictGroupLruCache ¶ added in v1.3.0
func EvictGroupLruCache(key BlockCacheKey, value *BlockCacheValue)
func EvictPieceLruCache ¶ added in v1.3.0
func EvictPieceLruCache(key BlockCacheKey, value *BlockCacheValue)
func FlushVerticalFilter ¶
func GenSchemaIdxs ¶
func GenSchemaIdxs(schema record.Schemas, skipIndexRelation *influxql.IndexRelation, fullTextIdx bool) []int
GenSchemaIdxs get bloom filter cols index in the schema
func GetAdaptiveTimeBucket ¶
func GetAggLogIntervalList ¶
func GetAggLogIntervalList() []int
func GetBinaryMask ¶ added in v1.3.0
func GetSkipIndexBuf ¶ added in v1.3.0
func GetSkipIndexBuf(cols int) *multiCosBuf
func GetVlmCacheInitialized ¶ added in v1.3.0
func GetVlmCacheInitialized() bool
func HotDataDetectorTaskLen ¶ added in v1.3.0
func HotDataDetectorTaskLen() int
func InitSkipIndexPool ¶ added in v1.3.0
func InitSkipIndexPool(size int)
func InitializeVlmCache ¶ added in v1.3.0
func InitializeVlmCache()
func IsFullTextIdx ¶
func IsFullTextIdx(indexRelation *influxql.IndexRelation) bool
func NewFileReader ¶ added in v1.3.0
func NewFileReader(reader fileops.BasicFileReader) *fileReader
func PutSkipIndexBuf ¶ added in v1.3.0
func PutSkipIndexBuf(buf *multiCosBuf)
func SendLogRequest ¶ added in v1.3.0
func SendLogRequest(readRequest *LogReadRequest)
func SendLogRequestWithHash ¶ added in v1.3.0
func SetVlmCacheInitialized ¶ added in v1.3.0
func SetVlmCacheInitialized(cacheEnabled bool)
func StartHotDataDetector ¶ added in v1.3.0
func StartHotDataDetector()
func StopHotDataDetector ¶ added in v1.3.0
func StopHotDataDetector()
Types ¶
type BlockCache ¶ added in v1.3.0
type BlockCache interface {
Store(key BlockCacheKey, version uint32, diskData []byte)
Fetch(key BlockCacheKey) ([]byte, bool)
FetchWith(key BlockCacheKey, version uint32, size int64) ([]byte, bool)
}
type BlockCacheKey ¶ added in v1.3.0
func (BlockCacheKey) Hash ¶ added in v1.3.0
func (key BlockCacheKey) Hash() uint32
type BlockCacheValue ¶ added in v1.3.0
type BlockFileContainer ¶ added in v1.3.0
type BlockFileContainer struct {
// contains filtered or unexported fields
}
func NewBlockFileContainer ¶ added in v1.3.0
func (*BlockFileContainer) Fetch ¶ added in v1.3.0
func (c *BlockFileContainer) Fetch(bankId int32, blockId int32) ([]byte, bool)
func (*BlockFileContainer) MultiFetch ¶ added in v1.3.0
func (*BlockFileContainer) MultiRemove ¶ added in v1.3.0
func (c *BlockFileContainer) MultiRemove(banks []int32, blocks []int32)
func (*BlockFileContainer) MultiStore ¶ added in v1.3.0
func (c *BlockFileContainer) MultiStore(data []byte) ([]int32, []int32, bool)
func (*BlockFileContainer) Remove ¶ added in v1.3.0
func (c *BlockFileContainer) Remove(bankId int32, blockId int32)
type BlockLocks ¶ added in v1.3.0
type BlockLocks []uint64
func NewBlockLocks ¶ added in v1.3.0
func NewBlockLocks(size int32) BlockLocks
func (*BlockLocks) Clear ¶ added in v1.3.0
func (b *BlockLocks) Clear(index int32)
func (*BlockLocks) Empty ¶ added in v1.3.0
func (b *BlockLocks) Empty(index int32) bool
func (*BlockLocks) Next ¶ added in v1.3.0
func (b *BlockLocks) Next() int32
func (*BlockLocks) Set ¶ added in v1.3.0
func (b *BlockLocks) Set(index int32) bool
type BlockLruCache ¶ added in v1.3.0
type BlockLruCache struct {
// contains filtered or unexported fields
}
func GetHotGroupLruCache ¶ added in v1.3.0
func GetHotGroupLruCache() *BlockLruCache
func GetHotPieceLruCache ¶ added in v1.3.0
func GetHotPieceLruCache() *BlockLruCache
func NewBlockLruCache ¶ added in v1.3.0
func NewBlockLruCache(bankFilePath string, lruCacheType LruCacheType, size uint32, segmentCnt uint32, ttl time.Duration, onEvict expirable.EvictCallback[BlockCacheKey, *BlockCacheValue]) *BlockLruCache
func (*BlockLruCache) Fetch ¶ added in v1.3.0
func (c *BlockLruCache) Fetch(key BlockCacheKey) ([]byte, bool)
func (*BlockLruCache) FetchWith ¶ added in v1.3.0
func (c *BlockLruCache) FetchWith(key BlockCacheKey, version uint32, size int64) ([]byte, bool)
func (*BlockLruCache) GetBlockFileContainer ¶ added in v1.3.0
func (c *BlockLruCache) GetBlockFileContainer(version uint32) *BlockFileContainer
func (*BlockLruCache) Remove ¶ added in v1.3.0
func (c *BlockLruCache) Remove(key BlockCacheKey) bool
func (*BlockLruCache) SetBlockFileContainer ¶ added in v1.3.0
func (c *BlockLruCache) SetBlockFileContainer(version uint32, fileContainer *BlockFileContainer)
func (*BlockLruCache) Store ¶ added in v1.3.0
func (c *BlockLruCache) Store(key BlockCacheKey, version uint32, diskData []byte)
type BloomFilterReader ¶ added in v1.3.0
type BloomFilterReader interface {
Size() (int64, error)
ReadBatch(offs, sizes []int64, limit int, isStat bool, results map[int64][]byte) error
StartSpan(span *tracing.Span)
EndSpan()
Close() error
}
func NewBasicBloomfilterReader ¶ added in v1.3.0
func NewBasicBloomfilterReader(obsOpts *obs.ObsOptions, path, fileName string) (BloomFilterReader, error)
func NewBloomfilterReader ¶ added in v1.3.0
func NewBloomfilterReader(obsOpts *obs.ObsOptions, path, fileName string, version uint32) (BloomFilterReader, error)
type Constant ¶
type Constant struct {
FilterDataMemSize int64 // single bloomfilter's size
FilterDataDiskSize int64 // FilterDataDiskSize = FilterDataMemSize + crc32
FilterCntPerVerticalGorup int64 // the bloom filter's count in single vertical group
FilterGroupMaxDiskSize int64 // the maximum size of row storage before converting to vertical storage
VerticalPieceMemSize int64 // single Vertical piece's size in memory, equals 8Byte * FilterCntPerVerticalGorup
VerticalPieceDiskSize int64 // VerticalPieceDiskSize = VerticalPieceMemSize + crc32
VerticalPieceCntPerFilter int64
VerticalGroupDiskSize int64
ScanBlockCnt int
MetaDataBatchSize int
UnnestMetaDataBatchSize int
MaxBlockDataSize int
MaxLogSizeLimit int
MaxBlockLogStoreNBytes int
MaxBlockCompressedNBytes int
MaxBlockStoreNBytes int
ContentAppenderBufferSize int
}
var LogStoreConstantV0 *Constant // version[0, 1]
var LogStoreConstantV2 *Constant // version[2, ...]
func GetConstant ¶
type HitHashes ¶ added in v1.3.0
type HitHashes struct {
// contains filtered or unexported fields
}
type HotDataDetector ¶ added in v1.3.0
type HotDataDetector struct {
// contains filtered or unexported fields
}
func NewHotDataDetector ¶ added in v1.3.0
func NewHotDataDetector() *HotDataDetector
func (*HotDataDetector) Close ¶ added in v1.3.0
func (d *HotDataDetector) Close()
func (*HotDataDetector) DetectHotLogPath ¶ added in v1.3.0
func (d *HotDataDetector) DetectHotLogPath(request *LogReadRequest)
func (*HotDataDetector) Evict ¶ added in v1.3.0
func (d *HotDataDetector) Evict() bool
func (*HotDataDetector) ForceEvict ¶ added in v1.3.0
func (d *HotDataDetector) ForceEvict() bool
func (*HotDataDetector) Len ¶ added in v1.3.0
func (d *HotDataDetector) Len() int
func (*HotDataDetector) Running ¶ added in v1.3.0
func (d *HotDataDetector) Running()
type HotDataValue ¶ added in v1.3.0
type HotDataValue struct {
// contains filtered or unexported fields
}
func NewHotDataValue ¶ added in v1.3.0
func NewHotDataValue(logPath *LogPath) *HotDataValue
func (*HotDataValue) Close ¶ added in v1.3.0
func (v *HotDataValue) Close()
func (*HotDataValue) Frequency ¶ added in v1.3.0
func (v *HotDataValue) Frequency() int
func (*HotDataValue) GroupPrefetch ¶ added in v1.3.0
func (v *HotDataValue) GroupPrefetch()
func (*HotDataValue) Open ¶ added in v1.3.0
func (v *HotDataValue) Open()
func (*HotDataValue) PiecePrefetch ¶ added in v1.3.0
func (v *HotDataValue) PiecePrefetch()
func (*HotDataValue) PrefetchRutine ¶ added in v1.3.0
func (v *HotDataValue) PrefetchRutine()
func (*HotDataValue) SetVisitTime ¶ added in v1.3.0
func (v *HotDataValue) SetVisitTime(visitTime int64, hashes []uint64)
type HotDataValues ¶ added in v1.3.0
type HotDataValues []*HotDataValue
func (HotDataValues) Len ¶ added in v1.3.0
func (n HotDataValues) Len() int
func (HotDataValues) Less ¶ added in v1.3.0
func (n HotDataValues) Less(i, j int) bool
func (HotDataValues) Swap ¶ added in v1.3.0
func (n HotDataValues) Swap(i, j int)
type Iterator ¶ added in v1.3.0
type Iterator[V comparable] struct { // contains filtered or unexported fields }
func NewIterator ¶ added in v1.3.0
func NewIterator[V comparable](rb *RingBuf[V]) *Iterator[V]
type LogPath ¶ added in v1.3.0
type LogPath struct {
Path string
FileName string
Version uint32
ObsOpt *obs.ObsOptions
}
type LogReadRequest ¶ added in v1.3.0
type LogReadRequest struct {
// contains filtered or unexported fields
}
type LruCache ¶ added in v1.3.0
type LruCache[K LruCacheKey, V any] struct { // contains filtered or unexported fields }
func NewLruCache ¶ added in v1.3.0
func NewLruCache[K LruCacheKey, V any](size uint32, segmentCnt uint32, ttl time.Duration, onEvict expirable.EvictCallback[K, V]) *LruCache[K, V]
segCnt need to be
type LruCacheKey ¶ added in v1.3.0
type LruCacheKey interface {
comparable
Hash() uint32
}
type LruCacheType ¶ added in v1.3.0
type LruCacheType int
const ( PieceCacheType LruCacheType = iota GroupCacheType )
type PrefetchType ¶ added in v1.3.0
type PrefetchType int
const ( PiecePrefetch PrefetchType = iota GroupPrefetch )
type RingBuf ¶ added in v1.3.0
type RingBuf[V comparable] struct { // contains filtered or unexported fields }
RingBuf has a fixed size, and suggest setting the size to be a power of 2 When data exceeds the size, old data will be overwritten by new data.
func NewRingBuf ¶ added in v1.3.0
func NewRingBuf[V comparable](size int) *RingBuf[V]
func (*RingBuf[V]) ReadOldest ¶ added in v1.3.0
func (rb *RingBuf[V]) ReadOldest() V
func (*RingBuf[V]) RemoveOldest ¶ added in v1.3.0
func (rb *RingBuf[V]) RemoveOldest()
type VlmCacheReader ¶ added in v1.3.0
type VlmCacheReader struct {
// contains filtered or unexported fields
}
func NewVlmCacheReader ¶ added in v1.3.0
func NewVlmCacheReader(reader BloomFilterReader, path, fileName string, version uint32) *VlmCacheReader
func (*VlmCacheReader) Close ¶ added in v1.3.0
func (r *VlmCacheReader) Close() error
func (*VlmCacheReader) EndSpan ¶ added in v1.3.0
func (r *VlmCacheReader) EndSpan()
func (*VlmCacheReader) Size ¶ added in v1.3.0
func (r *VlmCacheReader) Size() (int64, error)
func (*VlmCacheReader) StartSpan ¶ added in v1.3.0
func (r *VlmCacheReader) StartSpan(span *tracing.Span)