logstore

package
v1.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2025 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Overview

Copyright 2024 Huawei Cloud Computing Technologies Co., Ltd.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2023 Huawei Cloud Computing Technologies Co., Ltd.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2023 Huawei Cloud Computing Technologies Co., Ltd.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2023 Huawei Cloud Computing Technologies Co., Ltd.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	PieceLruSegNumber uint32 = 16
	GroupLruSegNumber uint32 = 4
	BankFileNumber    int32  = 8
)
View Source
const (
	GramTokenizerVersion       = 3
	CurrentLogTokenizerVersion = 5
)
View Source
const (
	Year int = iota
	Month
	Day
	Hour
	Minute
	Second
)
View Source
const (
	SecCountPerMinute int = 1 * 60
	SecCountPerHour       = 1 * 60 * SecCountPerMinute
	SecCountPerDay        = 1 * 24 * SecCountPerHour
	SecCountPerMonth      = 1 * 30 * SecCountPerDay
	SecCountPerYear       = 1 * 12 * SecCountPerMonth
)
View Source
const (
	OBSMetaFileName    = "segment.meta"
	OBSVLMFileName     = "segment.vlm"
	OBSContentFileName = "segment.cnt"
	FilterLogName      = "segment.fll"
)
View Source
const (
	MaxReadRequestChanSize = 512
	MaxTimeBuf             = 256
	TimeInverval           = 30 * time.Minute
	FrequencyThreshold     = 10
)
View Source
const (
	Prime_64 uint64 = 0x9E3779B185EBCA87
)

Variables

View Source
var (
	HotPieceLruCache         *BlockLruCache = nil
	HotGroupLruCache         *BlockLruCache = nil
	BlockLruCacheInitialized                = false

	PieceCacheEvictNum uint64 = 0
	PieceCacheHitNum   uint64 = 0
	PieceCacheHitMiss  uint64 = 0

	VlmCacheReaderBeforeCache uint64 = 0
	VlmCacheReaderAfterCache  uint64 = 0
	VlmCacheReaderBeforRead   uint64 = 0
	VlmCacheReaderAfterRead   uint64 = 0
	VlmCacheReaderAfterStore  uint64 = 0

	PieceCacheBeforeFetch     uint64 = 0
	PieceCacheAfterFetch      uint64 = 0
	PieceContainerBeforeFetch uint64 = 0
	PieceContainerAfterFetch  uint64 = 0

	PieceCacheStoreBeforeFetch uint64 = 0
	PieceCacheStoreAfterFetch  uint64 = 0
	PieceCacheBeforeStore      uint64 = 0
	PieceCacheAfterStore       uint64 = 0
	PieceContainerBeforeStore  uint64 = 0
	PieceContainerAfterStore   uint64 = 0
)
View Source
var AggLogBucketCountList = []int{
	60, 24, 36, 48, 60, 36, 42, 48, 54, 60,
	22, 24, 26, 28, 30, 32, 34, 36, 38, 40,
	42, 44, 46, 48, 50, 52, 54, 56, 58, 60,
	31, 32, 33, 34, 35, 36, 37, 38, 39, 40,
	41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
	51, 52, 53, 54, 55, 56, 57, 58, 59, 60,
}
View Source
var AggLogIntervalList = GetAggLogIntervalList()
View Source
var (
	PrefetchTimerInterval = 5 * time.Second
)
View Source
var SkipIndexPool pool.FixedPool

Functions

func EvictGroupLruCache added in v1.3.0

func EvictGroupLruCache(key BlockCacheKey, value *BlockCacheValue)

func EvictPieceLruCache added in v1.3.0

func EvictPieceLruCache(key BlockCacheKey, value *BlockCacheValue)

func FlushVerticalFilter

func FlushVerticalFilter(filterVerBuffer, filterLogBuffer []byte) []byte

func GenSchemaIdxs

func GenSchemaIdxs(schema record.Schemas, skipIndexRelation *influxql.IndexRelation, fullTextIdx bool) []int

GenSchemaIdxs get bloom filter cols index in the schema

func GetAdaptiveTimeBucket

func GetAdaptiveTimeBucket(startTime, endTime time.Time, ascending bool) time.Duration

func GetAggLogIntervalList

func GetAggLogIntervalList() []int

func GetBinaryMask added in v1.3.0

func GetBinaryMask(segCnt uint32) (uint32, uint32)

func GetSkipIndexBuf added in v1.3.0

func GetSkipIndexBuf(cols int) *multiCosBuf

func GetVlmCacheInitialized added in v1.3.0

func GetVlmCacheInitialized() bool

func HotDataDetectorTaskLen added in v1.3.0

func HotDataDetectorTaskLen() int

func InitSkipIndexPool added in v1.3.0

func InitSkipIndexPool(size int)

func InitializeVlmCache added in v1.3.0

func InitializeVlmCache()

func IsFullTextIdx

func IsFullTextIdx(indexRelation *influxql.IndexRelation) bool

func NewFileReader added in v1.3.0

func NewFileReader(reader fileops.BasicFileReader) *fileReader

func PutSkipIndexBuf added in v1.3.0

func PutSkipIndexBuf(buf *multiCosBuf)

func SendLogRequest added in v1.3.0

func SendLogRequest(readRequest *LogReadRequest)

func SendLogRequestWithHash added in v1.3.0

func SendLogRequestWithHash(object *LogPath, queryHashes map[string][]uint64)

func SetVlmCacheInitialized added in v1.3.0

func SetVlmCacheInitialized(cacheEnabled bool)

func StartHotDataDetector added in v1.3.0

func StartHotDataDetector()

func StopHotDataDetector added in v1.3.0

func StopHotDataDetector()

Types

type BlockCache added in v1.3.0

type BlockCache interface {
	Store(key BlockCacheKey, version uint32, diskData []byte)
	Fetch(key BlockCacheKey) ([]byte, bool)
	FetchWith(key BlockCacheKey, version uint32, size int64) ([]byte, bool)
}

type BlockCacheKey added in v1.3.0

type BlockCacheKey struct {
	PathId   uint64
	Position uint64
}

func (BlockCacheKey) Hash added in v1.3.0

func (key BlockCacheKey) Hash() uint32

type BlockCacheValue added in v1.3.0

type BlockCacheValue struct {
	BankId  int32
	BlockId int32
	Version uint32 // LogShard version
}

type BlockFileContainer added in v1.3.0

type BlockFileContainer struct {
	// contains filtered or unexported fields
}

func NewBlockFileContainer added in v1.3.0

func NewBlockFileContainer(basePath string, bankNum int32, blockNum int32, blockSize int32) (*BlockFileContainer, error)

func (*BlockFileContainer) Fetch added in v1.3.0

func (c *BlockFileContainer) Fetch(bankId int32, blockId int32) ([]byte, bool)

func (*BlockFileContainer) FetchWith added in v1.3.0

func (c *BlockFileContainer) FetchWith(bankId int32, blockId int32, offset int64, length int64) ([]byte, bool)

func (*BlockFileContainer) MultiFetch added in v1.3.0

func (c *BlockFileContainer) MultiFetch(banks []int32, blocks []int32, totalLength int32) ([]byte, bool)

func (*BlockFileContainer) MultiRemove added in v1.3.0

func (c *BlockFileContainer) MultiRemove(banks []int32, blocks []int32)

func (*BlockFileContainer) MultiStore added in v1.3.0

func (c *BlockFileContainer) MultiStore(data []byte) ([]int32, []int32, bool)

func (*BlockFileContainer) Remove added in v1.3.0

func (c *BlockFileContainer) Remove(bankId int32, blockId int32)

func (*BlockFileContainer) Store added in v1.3.0

func (c *BlockFileContainer) Store(data []byte) (int32, int32, bool)

type BlockLocks added in v1.3.0

type BlockLocks []uint64

func NewBlockLocks added in v1.3.0

func NewBlockLocks(size int32) BlockLocks

func (*BlockLocks) Clear added in v1.3.0

func (b *BlockLocks) Clear(index int32)

func (*BlockLocks) Empty added in v1.3.0

func (b *BlockLocks) Empty(index int32) bool

func (*BlockLocks) Next added in v1.3.0

func (b *BlockLocks) Next() int32

func (*BlockLocks) Set added in v1.3.0

func (b *BlockLocks) Set(index int32) bool

type BlockLruCache added in v1.3.0

type BlockLruCache struct {
	// contains filtered or unexported fields
}

func GetHotGroupLruCache added in v1.3.0

func GetHotGroupLruCache() *BlockLruCache

func GetHotPieceLruCache added in v1.3.0

func GetHotPieceLruCache() *BlockLruCache

func NewBlockLruCache added in v1.3.0

func NewBlockLruCache(bankFilePath string, lruCacheType LruCacheType, size uint32, segmentCnt uint32, ttl time.Duration,
	onEvict expirable.EvictCallback[BlockCacheKey, *BlockCacheValue]) *BlockLruCache

func (*BlockLruCache) Fetch added in v1.3.0

func (c *BlockLruCache) Fetch(key BlockCacheKey) ([]byte, bool)

func (*BlockLruCache) FetchWith added in v1.3.0

func (c *BlockLruCache) FetchWith(key BlockCacheKey, version uint32, size int64) ([]byte, bool)

func (*BlockLruCache) GetBlockFileContainer added in v1.3.0

func (c *BlockLruCache) GetBlockFileContainer(version uint32) *BlockFileContainer

func (*BlockLruCache) Remove added in v1.3.0

func (c *BlockLruCache) Remove(key BlockCacheKey) bool

func (*BlockLruCache) SetBlockFileContainer added in v1.3.0

func (c *BlockLruCache) SetBlockFileContainer(version uint32, fileContainer *BlockFileContainer)

func (*BlockLruCache) Store added in v1.3.0

func (c *BlockLruCache) Store(key BlockCacheKey, version uint32, diskData []byte)

type BloomFilterReader added in v1.3.0

type BloomFilterReader interface {
	Size() (int64, error)
	ReadBatch(offs, sizes []int64, limit int, isStat bool, results map[int64][]byte) error
	StartSpan(span *tracing.Span)
	EndSpan()
	Close() error
}

func NewBasicBloomfilterReader added in v1.3.0

func NewBasicBloomfilterReader(obsOpts *obs.ObsOptions, path, fileName string) (BloomFilterReader, error)

func NewBloomfilterReader added in v1.3.0

func NewBloomfilterReader(obsOpts *obs.ObsOptions, path, fileName string, version uint32) (BloomFilterReader, error)

type Constant

type Constant struct {
	FilterDataMemSize         int64 // single bloomfilter's size
	FilterDataDiskSize        int64 // FilterDataDiskSize = FilterDataMemSize + crc32
	FilterCntPerVerticalGorup int64 // the bloom filter's count in single vertical group
	FilterGroupMaxDiskSize    int64 // the maximum size of row storage before converting to vertical storage

	VerticalPieceMemSize      int64 // single Vertical piece's size in memory, equals 8Byte * FilterCntPerVerticalGorup
	VerticalPieceDiskSize     int64 // VerticalPieceDiskSize = VerticalPieceMemSize + crc32
	VerticalPieceCntPerFilter int64
	VerticalGroupDiskSize     int64
	ScanBlockCnt              int
	MetaDataBatchSize         int
	UnnestMetaDataBatchSize   int

	MaxBlockDataSize       int
	MaxLogSizeLimit        int
	MaxBlockLogStoreNBytes int

	MaxBlockCompressedNBytes int
	MaxBlockStoreNBytes      int

	ContentAppenderBufferSize int
}
var LogStoreConstantV0 *Constant // version[0, 1]
var LogStoreConstantV2 *Constant // version[2, ...]

func GetConstant

func GetConstant(version uint32) *Constant

func InitConstant

func InitConstant(filterDataMemSize, filterCntPerVerticalGorup int64, scanBlockCnt, metaDataBatchSize, unnestMetaDataBatchSize int,
	maxBlockDataSize, maxBlockCompressedNBytes, ContentAppenderBufferSize int) *Constant

type HitHashes added in v1.3.0

type HitHashes struct {
	// contains filtered or unexported fields
}

type HotDataDetector added in v1.3.0

type HotDataDetector struct {
	// contains filtered or unexported fields
}

func NewHotDataDetector added in v1.3.0

func NewHotDataDetector() *HotDataDetector

func (*HotDataDetector) Close added in v1.3.0

func (d *HotDataDetector) Close()

func (*HotDataDetector) DetectHotLogPath added in v1.3.0

func (d *HotDataDetector) DetectHotLogPath(request *LogReadRequest)

func (*HotDataDetector) Evict added in v1.3.0

func (d *HotDataDetector) Evict() bool

func (*HotDataDetector) ForceEvict added in v1.3.0

func (d *HotDataDetector) ForceEvict() bool

func (*HotDataDetector) Len added in v1.3.0

func (d *HotDataDetector) Len() int

func (*HotDataDetector) Running added in v1.3.0

func (d *HotDataDetector) Running()

type HotDataValue added in v1.3.0

type HotDataValue struct {
	// contains filtered or unexported fields
}

func NewHotDataValue added in v1.3.0

func NewHotDataValue(logPath *LogPath) *HotDataValue

func (*HotDataValue) Close added in v1.3.0

func (v *HotDataValue) Close()

func (*HotDataValue) Frequency added in v1.3.0

func (v *HotDataValue) Frequency() int

func (*HotDataValue) GroupPrefetch added in v1.3.0

func (v *HotDataValue) GroupPrefetch()

func (*HotDataValue) Open added in v1.3.0

func (v *HotDataValue) Open()

func (*HotDataValue) PiecePrefetch added in v1.3.0

func (v *HotDataValue) PiecePrefetch()

func (*HotDataValue) PrefetchRutine added in v1.3.0

func (v *HotDataValue) PrefetchRutine()

func (*HotDataValue) SetVisitTime added in v1.3.0

func (v *HotDataValue) SetVisitTime(visitTime int64, hashes []uint64)

type HotDataValues added in v1.3.0

type HotDataValues []*HotDataValue

func (HotDataValues) Len added in v1.3.0

func (n HotDataValues) Len() int

func (HotDataValues) Less added in v1.3.0

func (n HotDataValues) Less(i, j int) bool

func (HotDataValues) Swap added in v1.3.0

func (n HotDataValues) Swap(i, j int)

type Iterator added in v1.3.0

type Iterator[V comparable] struct {
	// contains filtered or unexported fields
}

func NewIterator added in v1.3.0

func NewIterator[V comparable](rb *RingBuf[V]) *Iterator[V]

func (*Iterator[V]) Next added in v1.3.0

func (it *Iterator[V]) Next() (V, bool)

type LogPath added in v1.3.0

type LogPath struct {
	Path     string
	FileName string
	Version  uint32
	ObsOpt   *obs.ObsOptions
}

type LogReadRequest added in v1.3.0

type LogReadRequest struct {
	// contains filtered or unexported fields
}

type LruCache added in v1.3.0

type LruCache[K LruCacheKey, V any] struct {
	// contains filtered or unexported fields
}

func NewLruCache added in v1.3.0

func NewLruCache[K LruCacheKey, V any](size uint32, segmentCnt uint32, ttl time.Duration, onEvict expirable.EvictCallback[K, V]) *LruCache[K, V]

segCnt need to be

func (*LruCache[K, V]) Add added in v1.3.0

func (cache *LruCache[K, V]) Add(key K, value V)

func (*LruCache[K, V]) Get added in v1.3.0

func (cache *LruCache[K, V]) Get(key K) (V, bool)

func (*LruCache[K, V]) Len added in v1.3.0

func (cache *LruCache[K, V]) Len() int

func (*LruCache[K, V]) Remove added in v1.3.0

func (cache *LruCache[K, V]) Remove(key K) bool

type LruCacheKey added in v1.3.0

type LruCacheKey interface {
	comparable
	Hash() uint32
}

type LruCacheType added in v1.3.0

type LruCacheType int
const (
	PieceCacheType LruCacheType = iota
	GroupCacheType
)

type PrefetchType added in v1.3.0

type PrefetchType int
const (
	PiecePrefetch PrefetchType = iota
	GroupPrefetch
)

type RingBuf added in v1.3.0

type RingBuf[V comparable] struct {
	// contains filtered or unexported fields
}

RingBuf has a fixed size, and suggest setting the size to be a power of 2 When data exceeds the size, old data will be overwritten by new data.

func NewRingBuf added in v1.3.0

func NewRingBuf[V comparable](size int) *RingBuf[V]

func (*RingBuf[V]) Empty added in v1.3.0

func (rb *RingBuf[V]) Empty() bool

func (*RingBuf[V]) Len added in v1.3.0

func (rb *RingBuf[V]) Len() int

func (*RingBuf[V]) Read added in v1.3.0

func (rb *RingBuf[V]) Read(idx int) V

func (*RingBuf[V]) ReadOldest added in v1.3.0

func (rb *RingBuf[V]) ReadOldest() V

func (*RingBuf[V]) RemoveOldest added in v1.3.0

func (rb *RingBuf[V]) RemoveOldest()

func (*RingBuf[V]) Reset added in v1.3.0

func (rb *RingBuf[V]) Reset()

func (*RingBuf[V]) Size added in v1.3.0

func (rb *RingBuf[V]) Size() int

func (*RingBuf[V]) Write added in v1.3.0

func (rb *RingBuf[V]) Write(value V)

type VlmCacheReader added in v1.3.0

type VlmCacheReader struct {
	// contains filtered or unexported fields
}

func NewVlmCacheReader added in v1.3.0

func NewVlmCacheReader(reader BloomFilterReader, path, fileName string, version uint32) *VlmCacheReader

func (*VlmCacheReader) Close added in v1.3.0

func (r *VlmCacheReader) Close() error

func (*VlmCacheReader) EndSpan added in v1.3.0

func (r *VlmCacheReader) EndSpan()

func (*VlmCacheReader) ReadBatch added in v1.3.0

func (r *VlmCacheReader) ReadBatch(offs, sizes []int64, limit int, isStat bool, result map[int64][]byte) error

func (*VlmCacheReader) Size added in v1.3.0

func (r *VlmCacheReader) Size() (int64, error)

func (*VlmCacheReader) StartSpan added in v1.3.0

func (r *VlmCacheReader) StartSpan(span *tracing.Span)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL