engine

package
v1.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2026 License: Apache-2.0 Imports: 95 Imported by: 0

Documentation

Overview

Copyright 2024 Huawei Cloud Computing Technologies Co., Ltd.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2022 Huawei Cloud Computing Technologies Co., Ltd.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	MetaIndexAndBlockIdDuration = "meta_index_duration"
	PrimaryKeyDuration          = "primary_key_duration"
)
View Source
const (
	HybridStoreReaderChunkNum = 7
	SegmentBatchCount         = 128
)
View Source
const (
	DelIndexBuilderId = math.MaxUint64
	// DefaultDirPerm Default directory permissions (user can read, write, and execute; group can read and execute; others have no permissions)
	DefaultDirPerm = 0750
)
View Source
const (
	SidSequenceReaderRecordNum = 6
	SequenceAggRecordNum       = 3
)
View Source
const (
	MaxRetryUpdateOnShardNum = 4

	CRCLen     = 4
	BufferSize = 1024 * 1024

	// OBSFileExtension is the extension used for OBS files.
	OBSFileExtension = ".init"
)
View Source
const (
	Failpoint = "failpoint"

	BackgroundReadLimiter = "backgroundReadLimiter"
)
View Source
const (
	TierLeveMem           = 1 // in memory
	TierLeveLocalDisk     = 2
	TierLeveObjectStorage = 3
)
View Source
const (
	DefaultFileSize   = 10 * 1024 * 1024
	WALFileSuffixes   = "wal"
	StreamWalDir      = "stream"
	WalRecordHeadSize = 1 + 4
	WalCompBufSize    = 256 * 1024
	WalCompMaxBufSize = 2 * 1024 * 1024
)
View Source
const (
	WriteWalUnKnownType = iota
	WriteWalLineProtocol
	WriteWalArrowFlight
	WriteWalEnd
)
View Source
const ColumnStoreReaderChunkNum = 7
View Source
const ColumnStoreReaderNoCopyChunkNum = 12 // hashMergeTransform noCopy send + topnTransform bufChunk cache
View Source
const ColumnStoreReaderNoCopyRecordNum = 12 // hashMerge 4 + indexScan 3 + topn 2 + reader 3
View Source
const ColumnStoreReaderRecordNum = 7
View Source
const DefaultUploadFrequence = 500 * time.Millisecond
View Source
const (
	IncDataSegmentNum = 16
)
View Source
const MaxFileInfoSize = 1024
View Source
const WalFilePathReg = "/wal/(\\w+)/(\\d+)/(\\w+)/(\\d+)_(\\d+)_(\\d+)_(\\d+)/(.*)"

Variables

View Source
var AggFactory = make(map[string]AggOperator)
View Source
var AppendManyNils map[int]func(colVal *record.ColVal, count int)
View Source
var DeleteDatabaseTimeout = time.Second * 15
View Source
var (
	DownSampleWriteDrop = true
)
View Source
var (
	IntervalRecordPool = record.NewRecordPool(record.IntervalRecordPool)
)
View Source
var (
	RecordIteratorPool = &sync.Pool{}
)

Functions

func AddLocations added in v1.3.0

func AddLocations(l *immutable.LocationCursor, files immutable.TableReaders, ctx *idKeyCursorContext, sid uint64, metaCtx *immutable.ChunkMetaContext) error

func AddLocationsWithFirstTime

func AddLocationsWithFirstTime(l *immutable.LocationCursor, files immutable.TableReaders, ctx *idKeyCursorContext, sid uint64) (int64, error)

func AddLocationsWithInit

func AddLocationsWithInit(l *immutable.LocationCursor, files immutable.TableReaders, ctx *idKeyCursorContext, sid uint64) error

func AddLocationsWithLimit

func AddLocationsWithLimit(l *immutable.LocationCursor, files immutable.TableReaders, ctx *idKeyCursorContext, sid uint64) (int64, error)

func AppendColumnTimes

func AppendColumnTimes(bitmap []bool, column executor.Column, columnTimes []int64, recCol *record.ColVal)

func AppendNilRowWithTime added in v1.0.0

func AppendNilRowWithTime(rec *record.Record, t int64)

func AppendRecWithNilRows added in v1.0.0

func AppendRecWithNilRows(rec, re *record.Record, opt hybridqp.Options, seriesStart, seriesEnd, shardStart int64, last bool)

func CanNotAggOnSeriesFunc

func CanNotAggOnSeriesFunc(m map[string]*influxql.Call) bool

func CreateCursor added in v1.4.0

func CreateCursor(ctx context.Context, schema *executor.QuerySchema, span *tracing.Span, shards []*shard, tagSets []tsi.TagSet, seriesNum int) (comm.TSIndexInfo, error)

func CreateTagSetInParallel added in v1.4.0

func CreateTagSetInParallel(work func(int, int, bool, *sync.WaitGroup, tsi.TagSet), subTagSetN int, tagSet tsi.TagSet)

func CreateTagSetInSerial added in v1.4.0

func CreateTagSetInSerial(work func(int, int, bool, *sync.WaitGroup, tsi.TagSet), subTagSetN int, tagSet tsi.TagSet)

func FilterInstantNANPoint added in v1.4.0

func FilterInstantNANPoint(rec, outRecord *record.Record) *record.Record

func FilterRangeNANPoint added in v1.4.0

func FilterRangeNANPoint(rec *record.Record) *record.Record

func FloatCountPromMerge added in v1.3.0

func FloatCountPromMerge(prevBuf, currBuf *floatColBuf)

func FloatCountPromReduce added in v1.3.0

func FloatCountPromReduce(cv *record.ColVal, values []float64, start, end int) (int, float64, bool)

func GenMergeShardPath added in v1.5.0

func GenMergeShardPath(rpPath string, startTime int64, endTime int64, shardId uint64, indexId uint64) string

func GenPeerPtFilePath added in v1.3.0

func GenPeerPtFilePath(sh Shard, peersPtIDMap map[uint32]*NodeInfo, nodePath, fullPath string) []string

func GenShardDirPath added in v1.3.0

func GenShardDirPath(metaClient meta.MetaClient, dbName string, ptId uint32) (map[uint32]*NodeInfo, error)

func GetCtx added in v1.3.0

func GetCtx(querySchema *executor.QuerySchema) (*idKeyCursorContext, error)

func GetIntersectTimeRange added in v1.3.0

func GetIntersectTimeRange(queryStartTime, queryEndTime, shardStartTime, shardEndTime int64) util.TimeRange

GetIntersectTimeRange used to get intersection of the query time and shard time

func GetMaxTime

func GetMaxTime(maxTime int64, rec *record.Record, isAscending bool) int64

func GetMemUsageLimit added in v1.1.0

func GetMemUsageLimit() int32

func GetMinTime

func GetMinTime(minTime int64, rec *record.Record, isAscending bool) int64

func InitMergeShardDirs added in v1.5.0

func InitMergeShardDirs(mergeShardPath string, lock fileops.FileLockOption, engineType config.EngineType) error

func IsMemUsageExceeded added in v1.1.0

func IsMemUsageExceeded() bool

func IsSameStep added in v1.4.0

func IsSameStep(startSample, endSample, step, duration, currT, nextT int64) bool

func MarshalWithMeasurements added in v1.5.0

func MarshalWithMeasurements(buf []byte, mst string, rec *record.Record) ([]byte, error)

func MaxPromMerge added in v1.3.0

func MaxPromMerge(prevBuf, currBuf *floatColBuf)

func MaxPromReduce added in v1.3.0

func MaxPromReduce(_ *record.ColVal, values []float64, start, end int) (int, float64, bool)

func MinPromMerge added in v1.3.0

func MinPromMerge(prevBuf, currBuf *floatColBuf)

func MinPromReduce added in v1.3.0

func MinPromReduce(_ *record.ColVal, values []float64, start, end int) (int, float64, bool)

func NewAggregateCursor

func NewAggregateCursor(input comm.KeyCursor, schema *executor.QuerySchema, globalPool *record.RecordPool, hasAuxTags bool) *aggregateCursor

func NewAttachedIndexReader added in v1.2.0

func NewAttachedIndexReader(ctx *indexContext, info *executor.AttachedIndexInfo, readerCtx *immutable.FileReaderContext) *attachedIndexReader

func NewChunkReader

func NewChunkReader(outputRowDataType hybridqp.RowDataType, ops []hybridqp.ExprOptions, seriesPlan hybridqp.QueryNode,
	schema *executor.QuerySchema, cursors []interface{}, state bool) executor.Processor

func NewDetachedIndexReader added in v1.2.0

func NewDetachedIndexReader(ctx *indexContext, obsOption *obs.ObsOptions, readerCtx *immutable.FileReaderContext) *detachedIndexReader

func NewDetachedLazyLoadIndexReader added in v1.3.0

func NewDetachedLazyLoadIndexReader(ctx *indexContext, obsOption *obs.ObsOptions, readerCtx *immutable.FileReaderContext) *detachedLazyLoadIndexReader

func NewFencer added in v1.0.0

func NewFencer(dataPath, walPath, db string, pt uint32) fencer

func NewFileLoopCursor

func NewFileLoopCursor(ctx *idKeyCursorContext, span *tracing.Span, schema *executor.QuerySchema,
	tagSet tsi.TagSetEx, start, step int, s *shard) *fileLoopCursor

func NewFileSequenceAggregator added in v1.0.0

func NewFileSequenceAggregator(schema hybridqp.Catalog, addPrefix bool, shardStartTime, shardEndTime int64) executor.Processor

func NewFloatColFloatHeapReducer

func NewFloatColFloatHeapReducer(inOrdinal, outOrdinal int, auxProcessors []*auxProcessor, floatHeapItem *FloatHeapItem) *floatColFloatHeapReducer

func NewIndexContext added in v1.2.0

func NewIndexContext(readBatch bool, batchCount int, schema hybridqp.Catalog, shardPath string) *indexContext

func NewIntegerColIntegerHeapReducer

func NewIntegerColIntegerHeapReducer(inOrdinal, outOrdinal int, auxProcessors []*auxProcessor, integerHeapItem *IntegerHeapItem) *integerColIntegerHeapReducer

func NewLimitCursor

func NewLimitCursor(schema *executor.QuerySchema, helper func(start, end int, src, des *record.Record)) *limitCursor

func NewRecordSchema

func NewRecordSchema(querySchema *executor.QuerySchema, auxTags []string, schema record.Schemas, filterConditions []*influxql.VarRef, engineType config.EngineType) ([]string, record.Schemas)

func NewSeriesInfoPool

func NewSeriesInfoPool(num int64) *filesInfoPool

func NewShard

func NewShard(dataPath, walPath string, lockPath *string, ident *meta.ShardIdentifier, durationInfo *meta.DurationDescriptor, tr *meta.TimeRangeInfo,
	options EngineOptions, engineType config.EngineType, ch chan []immutable.FileInfoExtend) *shard

func NewTagSetCursorForTest

func NewTagSetCursorForTest(schema *executor.QuerySchema, seriesN int) *tagSetCursor

NewTagSetCursorForTest for ut test, will remove later

func NewTopNLinkedList

func NewTopNLinkedList(n int, ascending bool) *topNLinkedList

func NewTsIndexInfo added in v1.4.0

func NewTsIndexInfo(immTables []*immutable.MmsReaders, memTables []MemDataReader, cursors []comm.KeyCursor) comm.TSIndexInfo

func NewTsspSequenceReader added in v1.0.0

func NewTsspSequenceReader(outputRowDataType hybridqp.RowDataType, ops []hybridqp.ExprOptions, seriesPlan hybridqp.QueryNode,
	source influxql.Sources, schema *executor.QuerySchema, files *immutable.TSSPFiles, newSeqs []uint64, stop chan struct{}) executor.Processor

func NewWriteIntoStorageTransform added in v1.0.0

func NewWriteIntoStorageTransform(outputRowDataType hybridqp.RowDataType, ops []hybridqp.ExprOptions, seriesPlan hybridqp.QueryNode,
	source influxql.Sources, schema *executor.QuerySchema, conf *immutable.Config, m *immutable.MmsTables, addPrefix bool) executor.Processor

func ParseSteamWalFilename added in v1.4.0

func ParseSteamWalFilename(name string) (int64, int64)

func ParseWalFilePath added in v1.5.0

func ParseWalFilePath(name, walPath string) (db, rp string, ptID uint32, shardID uint64, err error)

func RecordCutNormal

func RecordCutNormal(start, end int, src, dst *record.Record)

func RegisterNewEngineFun added in v1.5.0

func RegisterNewEngineFun(name string, fn NewEngineFun)

func RegisteredEngines added in v1.5.0

func RegisteredEngines() []string

func RegistryAggOp added in v1.3.0

func RegistryAggOp(name string, aggOp AggOperator)

func RegistryPromFunction added in v1.3.0

func RegistryPromFunction(name string, aggOp PromFunction)

func RemoveWalFiles added in v1.4.0

func RemoveWalFiles(files *WalFiles) error

func SetDelMergeSetForEachMergeSet added in v1.5.0

func SetDelMergeSetForEachMergeSet(dbPT *DBPTInfo, rp string) error

func SetFullCompColdDuration

func SetFullCompColdDuration(d time.Duration)

func SetNextMethod added in v1.0.0

func SetNextMethod(cursor comm.KeyCursor)

SetNextMethod for test

func SetParquetTaskConfig added in v1.5.0

func SetParquetTaskConfig(paras map[string]string) error

func UnmarshalWithMeasurements added in v1.5.0

func UnmarshalWithMeasurements(buf []byte, rec *record.Record) (string, error)

Types

type AggOperator added in v1.3.0

type AggOperator interface {
	CreateRoutine(*AggParams) (Routine, error)
}

func GetAggOperator added in v1.3.0

func GetAggOperator(name string) AggOperator

type AggParams added in v1.3.0

type AggParams struct {
	// contains filtered or unexported fields
}

func NewAggParams added in v1.3.0

func NewAggParams(inSchema, outSchema record.Schemas, opt hybridqp.ExprOptions, auxOp []*auxProcessor) *AggParams

type AggTagSetCursor

type AggTagSetCursor struct {
	// contains filtered or unexported fields
}

func NewAggTagSetCursor

func NewAggTagSetCursor(schema *executor.QuerySchema, ctx *idKeyCursorContext, itr comm.KeyCursor, singleSeries bool) *AggTagSetCursor

func (*AggTagSetCursor) Close

func (s *AggTagSetCursor) Close() error

func (*AggTagSetCursor) EndSpan

func (s *AggTagSetCursor) EndSpan()

func (*AggTagSetCursor) GetIndex added in v1.0.0

func (s *AggTagSetCursor) GetIndex(t int64) int64

func (*AggTagSetCursor) GetSchema

func (s *AggTagSetCursor) GetSchema() record.Schemas

func (*AggTagSetCursor) Init

func (s *AggTagSetCursor) Init()

func (*AggTagSetCursor) Name

func (s *AggTagSetCursor) Name() string

func (*AggTagSetCursor) Next

func (*AggTagSetCursor) NextAggData

func (s *AggTagSetCursor) NextAggData() (*record.Record, *comm.FileInfo, error)

func (*AggTagSetCursor) NextWithMultipleSeries added in v1.0.0

func (s *AggTagSetCursor) NextWithMultipleSeries() (*record.Record, comm.SeriesInfoIntf, error)

func (*AggTagSetCursor) NextWithSingleSeries added in v1.0.0

func (s *AggTagSetCursor) NextWithSingleSeries() (*record.Record, comm.SeriesInfoIntf, error)

func (*AggTagSetCursor) RecordInit

func (s *AggTagSetCursor) RecordInit() error

func (*AggTagSetCursor) SetOps

func (s *AggTagSetCursor) SetOps(ops []*comm.CallOption)

func (*AggTagSetCursor) SetParaForTest added in v1.0.0

func (s *AggTagSetCursor) SetParaForTest(schema record.Schemas)

func (*AggTagSetCursor) SetSchema

func (s *AggTagSetCursor) SetSchema(schema record.Schemas)

func (*AggTagSetCursor) SinkPlan

func (s *AggTagSetCursor) SinkPlan(plan hybridqp.QueryNode)

func (*AggTagSetCursor) StartSpan

func (s *AggTagSetCursor) StartSpan(span *tracing.Span)

func (*AggTagSetCursor) TagAuxHandler

func (s *AggTagSetCursor) TagAuxHandler(re *record.Record, start, end int)

func (*AggTagSetCursor) TimeWindowsInit

func (s *AggTagSetCursor) TimeWindowsInit()

func (*AggTagSetCursor) UpdateRec added in v1.0.0

func (s *AggTagSetCursor) UpdateRec(recRow, chunkRow int)

type Backup added in v1.3.0

type Backup struct {
	Name string

	BackupLogInfo *backup.BackupLogInfo
	Engine        *EngineImpl
	IsAborted     bool
	Status        BackupStatus

	IsInc           bool
	IsRemote        bool
	OnlyBackupMater bool
	BackupPath      string
	DataBases       []string
	// contains filtered or unexported fields
}

func (*Backup) BackupPt added in v1.3.0

func (s *Backup) BackupPt(dbName string, ptId uint32) error

func (*Backup) FullBackup added in v1.3.0

func (s *Backup) FullBackup(sh Shard, dataPath, nodePath string, peersPtIDMap map[uint32]*NodeInfo) error

func (*Backup) FullBackupTableFile added in v1.3.0

func (s *Backup) FullBackupTableFile(sh Shard, t immutable.TablesStore, peersPtIDMap map[uint32]*NodeInfo, name string, isOrder bool, nodePath, outPath string) ([][]string, error)

func (*Backup) IncBackup added in v1.3.0

func (s *Backup) IncBackup(sh Shard, dataPath, nodePath string, peersPtIDMap map[uint32]*NodeInfo) error

func (*Backup) IncBackupTableFile added in v1.3.0

func (s *Backup) IncBackupTableFile(sh Shard, t immutable.TablesStore, peersPtIDMap map[uint32]*NodeInfo, name string, isOrder bool, nodePath, outPath string) ([][]string, [][]string, error)

func (*Backup) RunBackupData added in v1.3.0

func (s *Backup) RunBackupData() error

func (*Backup) TraversePts added in v1.5.0

func (s *Backup) TraversePts(dbName string, pts []uint32) error

type BackupStatus added in v1.5.0

type BackupStatus string
const (
	Success     BackupStatus = "backup success"
	Failed      BackupStatus = "backup field"
	InProgress  BackupStatus = "backup in progress"
	NotBackedUp BackupStatus = "not backed up"
)

type BasePromOp added in v1.3.0

type BasePromOp struct {
	// contains filtered or unexported fields
}

func NewBasePromOp added in v1.3.0

func NewBasePromOp(name string, fn floatColFloatReduce, fv floatColFloatMerge) *BasePromOp

func (*BasePromOp) CreateRoutine added in v1.3.0

func (c *BasePromOp) CreateRoutine(p *AggParams) (Routine, error)

type ChunkMetaByField added in v1.0.0

type ChunkMetaByField struct {
	// contains filtered or unexported fields
}

ChunkMetaByField build from each chunkmeta. ChunkMetaByField obtain data by sid column by column

func NewChunkMetaByField added in v1.0.0

func NewChunkMetaByField(file immutable.TSSPFile, fieldIter *FieldIter, chunkMeta immutable.ChunkMeta, recordPool *record.CircularRecordPool) *ChunkMetaByField

type ChunkMetaByFieldIters added in v1.0.0

type ChunkMetaByFieldIters struct {
	// contains filtered or unexported fields
}

ChunkMetaByFieldIters is the iterator of ChunkMetaByField.

func NewChunkMetaByFieldIters added in v1.0.0

func NewChunkMetaByFieldIters(chunkMetas []immutable.ChunkMeta, file immutable.TSSPFile, fieldIter *FieldIter, recordPool *record.CircularRecordPool) *ChunkMetaByFieldIters

type ChunkReader

type ChunkReader struct {
	executor.BaseProcessor

	Output          *executor.ChunkPort
	ResultChunkPool *executor.CircularChunkPool
	// contains filtered or unexported fields
}

func (*ChunkReader) Abort added in v1.0.1

func (r *ChunkReader) Abort()

func (*ChunkReader) Close

func (r *ChunkReader) Close()

func (*ChunkReader) Create

func (*ChunkReader) Explain

func (r *ChunkReader) Explain() []executor.ValuePair

func (*ChunkReader) GetInputNumber

func (r *ChunkReader) GetInputNumber(executor.Port) int

func (*ChunkReader) GetInputs

func (r *ChunkReader) GetInputs() executor.Ports

func (*ChunkReader) GetOutputNumber

func (r *ChunkReader) GetOutputNumber(executor.Port) int

func (*ChunkReader) GetOutputs

func (r *ChunkReader) GetOutputs() executor.Ports

func (*ChunkReader) IsSink

func (r *ChunkReader) IsSink() bool

func (*ChunkReader) Name

func (r *ChunkReader) Name() string

func (*ChunkReader) Release

func (r *ChunkReader) Release() error

func (*ChunkReader) Work

func (r *ChunkReader) Work(ctx context.Context) error

type CoProcessor

type CoProcessor interface {
	WorkOnRecord(*record.Record, *record.Record, *ReducerParams)
}

type CoProcessorImpl

type CoProcessorImpl struct {
	Routines []Routine
}

func NewCoProcessorImpl

func NewCoProcessorImpl(routines ...Routine) *CoProcessorImpl

func (*CoProcessorImpl) AppendRoutine

func (p *CoProcessorImpl) AppendRoutine(routines ...Routine)

func (*CoProcessorImpl) WorkOnRecord

func (p *CoProcessorImpl) WorkOnRecord(in *record.Record, out *record.Record, params *ReducerParams)

type ColumnStoreImpl added in v1.5.0

type ColumnStoreImpl struct {
	// contains filtered or unexported fields
}

func (*ColumnStoreImpl) ForceFlush added in v1.5.0

func (storage *ColumnStoreImpl) ForceFlush(s *shard)

func (*ColumnStoreImpl) SetAccumulateMetaIndex added in v1.5.0

func (storage *ColumnStoreImpl) SetAccumulateMetaIndex(name string, aMetaIndex *immutable.AccumulateMetaIndex)

func (*ColumnStoreImpl) WriteCols added in v1.5.0

func (storage *ColumnStoreImpl) WriteCols(s *shard, cols *record.Record, mst string, binaryCols []byte) error

func (*ColumnStoreImpl) WriteIndex added in v1.5.0

func (storage *ColumnStoreImpl) WriteIndex(idx *tsi.IndexBuilder, mw *mstWriteCtx) func() error

func (*ColumnStoreImpl) WriteIndexForCols added in v1.5.0

func (storage *ColumnStoreImpl) WriteIndexForCols(s *shard, cols *record.Record, mstName string) error

type ColumnStoreReader added in v1.1.0

type ColumnStoreReader struct {
	executor.BaseProcessor
	// contains filtered or unexported fields
}

func NewColumnStoreReader added in v1.1.0

func NewColumnStoreReader(plan hybridqp.QueryNode, frags executor.ShardsFragments) *ColumnStoreReader

func (*ColumnStoreReader) Abort added in v1.1.0

func (r *ColumnStoreReader) Abort()

func (*ColumnStoreReader) Close added in v1.1.0

func (r *ColumnStoreReader) Close()

func (*ColumnStoreReader) Explain added in v1.1.0

func (r *ColumnStoreReader) Explain() []executor.ValuePair

func (*ColumnStoreReader) FragmentCount added in v1.1.0

func (r *ColumnStoreReader) FragmentCount() int

func (*ColumnStoreReader) GetInputNumber added in v1.1.0

func (r *ColumnStoreReader) GetInputNumber(_ executor.Port) int

func (*ColumnStoreReader) GetInputs added in v1.1.0

func (r *ColumnStoreReader) GetInputs() executor.Ports

func (*ColumnStoreReader) GetOutputNumber added in v1.1.0

func (r *ColumnStoreReader) GetOutputNumber(_ executor.Port) int

func (*ColumnStoreReader) GetOutputs added in v1.1.0

func (r *ColumnStoreReader) GetOutputs() executor.Ports

func (*ColumnStoreReader) IsSink added in v1.1.0

func (r *ColumnStoreReader) IsSink() bool

func (*ColumnStoreReader) Name added in v1.1.0

func (r *ColumnStoreReader) Name() string

func (*ColumnStoreReader) Release added in v1.1.0

func (r *ColumnStoreReader) Release() error

func (*ColumnStoreReader) Run added in v1.1.0

func (r *ColumnStoreReader) Run(ctx context.Context) (iterCount, rowCountBeforeFilter, rowCountAfterFilter int, err error)

func (*ColumnStoreReader) Work added in v1.1.0

func (r *ColumnStoreReader) Work(ctx context.Context) error

type ColumnStoreReaderCreator added in v1.1.0

type ColumnStoreReaderCreator struct {
}

func (*ColumnStoreReaderCreator) Create added in v1.1.0

func (*ColumnStoreReaderCreator) CreateReader added in v1.1.0

func (c *ColumnStoreReaderCreator) CreateReader(plan hybridqp.QueryNode, indexInfo interface{}) (executor.Processor, error)

type Compactor

type Compactor struct {
	// contains filtered or unexported fields
}

func NewCompactor

func NewCompactor() *Compactor

func (*Compactor) RegisterShard

func (c *Compactor) RegisterShard(sh *shard)

func (*Compactor) SetAllOutOfOrderMergeSwitch

func (c *Compactor) SetAllOutOfOrderMergeSwitch(en bool)

func (*Compactor) SetAllShardsCompactionSwitch

func (c *Compactor) SetAllShardsCompactionSwitch(en bool)

func (*Compactor) SetSnapshotColdDuration

func (c *Compactor) SetSnapshotColdDuration(d time.Duration)

func (*Compactor) ShardCompactionSwitch

func (c *Compactor) ShardCompactionSwitch(shid uint64, en bool)

func (*Compactor) ShardOutOfOrderMergeSwitch

func (c *Compactor) ShardOutOfOrderMergeSwitch(shid uint64, en bool)

func (*Compactor) UnregisterShard

func (c *Compactor) UnregisterShard(shardId uint64)

type ConsumeIterator added in v1.5.0

type ConsumeIterator struct {
	// contains filtered or unexported fields
}

func (*ConsumeIterator) Len added in v1.5.0

func (ci *ConsumeIterator) Len() int

func (*ConsumeIterator) Less added in v1.5.0

func (ci *ConsumeIterator) Less(i, j int) bool

func (*ConsumeIterator) Swap added in v1.5.0

func (ci *ConsumeIterator) Swap(i, j int)

type DBPTInfo

type DBPTInfo struct {
	ReplayC chan *raftconn.Commit
	// contains filtered or unexported fields
}

func NewDBPTInfo

func NewDBPTInfo(db string, id uint32, dataPath, walPath string, ctx *metaclient.LoadCtx, ch chan []immutable.FileInfoExtend, options *obs.ObsOptions) *DBPTInfo

func (*DBPTInfo) AddShard added in v1.3.0

func (dbPT *DBPTInfo) AddShard(id uint64, sh Shard)

func (*DBPTInfo) ApplyDroppedSeries added in v1.5.0

func (dbPT *DBPTInfo) ApplyDroppedSeries(ptId uint32, db string, errs []error) []error

func (*DBPTInfo) GetDelIndexBuilderByRp added in v1.5.0

func (dbPT *DBPTInfo) GetDelIndexBuilderByRp(rp string) *tsi.IndexBuilder

func (*DBPTInfo) GetLastIndex added in v1.5.0

func (dbPT *DBPTInfo) GetLastIndex() (uint64, error)

func (*DBPTInfo) HasCoverShard added in v1.5.0

func (dbpt *DBPTInfo) HasCoverShard(srcShardTimeRange *meta.ShardTimeRangeInfo, rp string, shardId uint64) bool

func (*DBPTInfo) IndexNoLock added in v1.5.0

func (dbPT *DBPTInfo) IndexNoLock(id uint64) *tsi.IndexBuilder

func (*DBPTInfo) MarkDropSeries added in v1.5.0

func (dbPT *DBPTInfo) MarkDropSeries(mstName []byte, metaClient metaclient.MetaClient, expr influxql.Expr, t tsi.TimeRange) error

func (*DBPTInfo) NewMergeSetIndex added in v1.5.0

func (dbPT *DBPTInfo) NewMergeSetIndex(rp string, timeRangeInfo *meta.ShardTimeRangeInfo, client metaclient.MetaClient, engineType config.EngineType) (uint64, string, fileops.FileLockOption, *tsi.IndexBuilder, error)

func (*DBPTInfo) NewShard

func (dbPT *DBPTInfo) NewShard(rp string, shardID uint64, timeRangeInfo *meta.ShardTimeRangeInfo, client metaclient.MetaClient, mstInfo *meta.MeasurementInfo) (Shard, error)

func (*DBPTInfo) OpenIndexes

func (dbPT *DBPTInfo) OpenIndexes(opId uint64, rp string, engineType config.EngineType, client metaclient.MetaClient) error

func (*DBPTInfo) OpenShards

func (dbPT *DBPTInfo) OpenShards(opId uint64, rp string, durationInfos map[uint64]*meta.ShardDurationInfo, loadStat int, client metaclient.MetaClient) error

func (*DBPTInfo) SetDatabase added in v1.5.0

func (dbPT *DBPTInfo) SetDatabase(name string)

SetDatabase only used for mock test

func (*DBPTInfo) SetDelIndexBuilder added in v1.5.0

func (dbPT *DBPTInfo) SetDelIndexBuilder(delIndexBuilder map[string]*tsi.IndexBuilder)

SetDelIndexBuilder only used for mock test

func (*DBPTInfo) SetDoingOff added in v1.5.0

func (dbPT *DBPTInfo) SetDoingOff(df bool)

func (*DBPTInfo) SetIndexBuilder added in v1.5.0

func (dbPT *DBPTInfo) SetIndexBuilder(indexBuilder map[uint64]*tsi.IndexBuilder)

SetIndexBuilder only used for mock test

func (*DBPTInfo) SetLockPath added in v1.5.0

func (dbPT *DBPTInfo) SetLockPath(lockPath *string)

SetLockPath only used for mock test

func (*DBPTInfo) SetNode added in v1.5.0

func (dbPT *DBPTInfo) SetNode(node raftNodeRequest)

SetNode only used for mock test

func (*DBPTInfo) SetOption

func (dbPT *DBPTInfo) SetOption(opt EngineOptions)

func (*DBPTInfo) SetParams added in v1.1.0

func (dbPT *DBPTInfo) SetParams(preload bool, lockPath *string, enableTagArray bool)

func (*DBPTInfo) SetPath added in v1.5.0

func (dbPT *DBPTInfo) SetPath(path string)

SetPath only used for mock test

func (*DBPTInfo) SetShards added in v1.5.0

func (dbPT *DBPTInfo) SetShards(shards map[uint64]Shard)

SetShards only used for mock test

func (*DBPTInfo) SetWalPath added in v1.5.0

func (dbPT *DBPTInfo) SetWalPath(walPath string)

SetWalPath only used for mock test

func (*DBPTInfo) Shard

func (dbPT *DBPTInfo) Shard(id uint64) Shard

func (*DBPTInfo) ShardIds added in v1.2.0

func (dbPT *DBPTInfo) ShardIds(tr *influxql.TimeRange) []uint64

func (*DBPTInfo) ShardNoLock added in v1.3.0

func (dbPT *DBPTInfo) ShardNoLock(id uint64) Shard

func (*DBPTInfo) Shards added in v1.5.0

func (dbPT *DBPTInfo) Shards() map[uint64]Shard

type DDLBasePlans added in v1.4.0

type DDLBasePlans interface {
	Execute(mstKeys map[string][][]byte, condition influxql.Expr, tr util.TimeRange, limit int) (interface{}, error)
	AddPlan(plan interface{})
	Stop()
}

type DDLBasePlansImpl added in v1.5.0

type DDLBasePlansImpl struct {
	// contains filtered or unexported fields
}

func (*DDLBasePlansImpl) AddPlan added in v1.5.0

func (p *DDLBasePlansImpl) AddPlan(plan interface{})

func (*DDLBasePlansImpl) Execute added in v1.5.0

func (p *DDLBasePlansImpl) Execute(tagKeys map[string][][]byte, condition influxql.Expr, tr util.TimeRange, limit int) (interface{}, error)

func (*DDLBasePlansImpl) Stop added in v1.5.0

func (p *DDLBasePlansImpl) Stop()

type DataBlockInfo

type DataBlockInfo struct {
	// contains filtered or unexported fields
}

type DetachedMetaInfo added in v1.2.0

type DetachedMetaInfo struct {
	// contains filtered or unexported fields
}

func NewDetachedMetaInfo added in v1.2.0

func NewDetachedMetaInfo() *DetachedMetaInfo

type DownSampleFilesInfo added in v1.0.0

type DownSampleFilesInfo struct {
	Names    []string
	OldFiles [][]string
	NewFiles [][]string
	// contains filtered or unexported fields
}

type EndPointPair

type EndPointPair struct {
	Record  *record.Record
	Ordinal int
}

type Engine

type Engine interface {
	Open(durationInfos map[uint64]*meta.ShardDurationInfo, dbBriefInfos map[string]*meta.DatabaseBriefInfo, client metaclient.MetaClient) error
	Close() error
	ForceFlush()

	DeleteShard(db string, ptId uint32, shardID uint64) error
	DeleteMstInShard(db string, ptId uint32, shardID uint64, mst string) error
	DeleteIndex(db string, pt uint32, indexID uint64) error
	DeleteMstInIndex(db string, ptId uint32, indexID uint64, msts []string, onlyUseDiskThreshold uint64) error
	ClearIndexCache(db string, pt uint32, indexID uint64) error
	ExpiredShards(nilShardMap *map[uint64]*meta.ShardDurationInfo) []*meta.ShardIdentifier
	ExpiredShardsForMst(db, rp string, mst *meta.MeasurementTTLTnfo) []*meta.ShardIdentifier
	ExpiredIndexes(nilIndexMap *map[uint64]*meta.IndexDurationInfo) []*meta.IndexIdentifier
	ExpiredIndexesForMst(db, rp string, mst *meta.MeasurementTTLTnfo) []*meta.IndexIdentifier
	ExpiredCacheIndexes() []*meta.IndexIdentifier
	FetchShardsNeedChangeStore() ([]*meta.ShardIdentifier, []*meta.ShardIdentifier)
	FetchIndexesNeedChangeStore() (indexesToCold []*meta.IndexIdentifier)
	ChangeShardTierToWarm(db string, ptId uint32, shardID uint64) error

	CreateShard(db, rp string, ptId uint32, shardID uint64, timeRangeInfo *meta.ShardTimeRangeInfo, mstInfo *meta.MeasurementInfo) error
	WriteRows(db, rp string, ptId uint32, shardID uint64, points []influx.Row, binaryRows []byte, snp *raftlog.SnapShotter) error
	WriteRec(db, mst string, ptId uint32, shardID uint64, rec *record.Record, binaryRec []byte) error
	WriteBlobs(db string, ptId uint32, shardID uint64, group *shelf.BlobGroup) error
	WriteToRaft(db, rp string, ptId uint32, tail []byte) error
	CreateDBPT(db string, pt uint32, enableTagArray bool)

	GetShardDownSampleLevel(db string, ptId uint32, shardID uint64) int

	GetShardSplitPoints(db string, ptId uint32, shardID uint64, idxes []int64) ([]string, error)

	DeleteDatabase(db string, ptId uint32) error

	DropRetentionPolicy(db string, rp string, ptId uint32) error

	DropMeasurement(db string, rp string, name string, shardIds []uint64) error

	TagKeys(db string, ptIDs []uint32, measurements [][]byte, condition influxql.Expr, tr influxql.TimeRange) ([]string, error)

	SeriesKeys(db string, ptIDs []uint32, measurements [][]byte, condition influxql.Expr, tr influxql.TimeRange) ([]string, error)
	SeriesCardinality(db string, ptIDs []uint32, measurements [][]byte, condition influxql.Expr, tr influxql.TimeRange) ([]meta.MeasurementCardinalityInfo, error)
	SeriesExactCardinality(db string, ptIDs []uint32, measurements [][]byte, condition influxql.Expr, tr influxql.TimeRange) (map[string]uint64, error)

	TagValues(db string, ptId []uint32, tagKeys map[string][][]byte, condition influxql.Expr, tr influxql.TimeRange) (influxql.TablesTagSets, error)
	TagValuesCardinality(db string, ptIDs []uint32, tagKeys map[string][][]byte, condition influxql.Expr, tr influxql.TimeRange) (map[string]uint64, error)
	DropSeries() error
	MarkDropSeries(db string, ptID uint32, mstName []byte, expr influxql.Expr, t tsi.TimeRange) error

	DbPTRef(db string, ptId uint32) error
	DbPTUnref(db string, ptId uint32)
	CreateLogicalPlan(ctx context.Context, db string, ptId uint32, shardID []uint64, sources influxql.Sources, schema *executor.QuerySchema) (hybridqp.QueryNode, error)
	ScanWithSparseIndex(ctx context.Context, db string, ptId uint32, shardIDs []uint64, schema *executor.QuerySchema) (executor.ShardsFragments, error)
	GetIndexInfo(db string, ptId uint32, shardIDs uint64, schema *executor.QuerySchema) (*executor.AttachedIndexInfo, error)
	RowCount(db string, ptId uint32, shardIDs []uint64, schema *executor.QuerySchema) (int64, error)

	LogicalPlanCost(db string, ptId uint32, sources influxql.Sources, opt query.ProcessorOptions) (hybridqp.LogicalPlanCost, error)

	UpdateShardDurationInfo(info *meta.ShardDurationInfo, nilShardMap *map[uint64]*meta.ShardDurationInfo) error
	UpdateIndexDurationInfo(info *meta.IndexDurationInfo, nilIndexMap *map[uint64]*meta.IndexDurationInfo) error

	PreOffload(opId uint64, db string, ptId uint32) error
	RollbackPreOffload(opId uint64, db string, ptId uint32) error
	PreAssign(opId uint64, db string, ptId uint32, durationInfos map[uint64]*meta.ShardDurationInfo, dbBriefInfo *meta.DatabaseBriefInfo, client metaclient.MetaClient) error
	Offload(opId uint64, db string, ptId uint32) error
	Assign(opId uint64, nodeId uint64, db string, ptId uint32, ver uint64, durationInfos map[uint64]*meta.ShardDurationInfo, dbBriefInfo *meta.DatabaseBriefInfo, client metaclient.MetaClient, storage StorageService) error

	SysCtrl(req *msgservice.SysCtrlRequest) (map[string]string, error)
	Statistics(buffer []byte) ([]byte, error)
	StatisticsOps() []opsStat.OpsStatistic

	GetShardDownSamplePolicyInfos(meta MetaDownSample) ([]*meta.ShardDownSamplePolicyInfo, error)
	GetDownSamplePolicy(key string) *meta.StoreDownSamplePolicy
	StartDownSampleTask(info *meta.ShardDownSamplePolicyInfo, schema []hybridqp.Catalog, log *zap.Logger, meta MetaDownSample) error
	UpdateDownSampleInfo(policies *meta.DownSamplePoliciesInfoWithDbRp)
	UpdateShardDownSampleInfo(infos *meta.ShardDownSampleUpdateInfos)
	CheckPtsRemovedDone() bool
	TransferLeadership(database string, nodeId uint64, oldMasterPtId, newMasterPtId uint32) error
	HierarchicalStorage(db string, ptId uint32, shardID uint64) bool
	IndexHierarchicalStorage(db string, ptId uint32, shardID uint64) bool

	RaftMessage
	CreateDDLBasePlans(planType hybridqp.DDLType, db string, ptIDs []uint32, tr *influxql.TimeRange) DDLBasePlans
	CreateConsumeIterator(database, mst string, opt *query.ProcessorOptions) []record.Iterator
	SetMetaClient(m metaclient.MetaClient)

	RegisterOnPTLoaded(id uint64, f PtLoadFunc)
	UninstallOnPTLoaded(id uint64)
	RegisterOnPTOffload(id uint64, f func(ptID uint32))
	UninstallOnPTOffload(id uint64)

	MergeShards(meta.MergeShards) error
	ClearRepCold(req *msgservice.SendClearEventsRequest) error
	GetDatabase(database string) map[uint32]*DBPTInfo
	OpenShardLazy(sh Shard) error

	GetLastIndex(db string, ptId uint32) (uint64, error)
}

func NewEngine

func NewEngine(dataPath, walPath string, options EngineOptions, ctx *meta.LoadCtx) (Engine, error)

type EngineImpl added in v1.5.0

type EngineImpl struct {
	DBPartitions map[string]map[uint32]*DBPTInfo

	DownSamplePolicies map[string]*meta2.StoreDownSamplePolicy
	// contains filtered or unexported fields
}

func (*EngineImpl) Assign added in v1.5.0

func (e *EngineImpl) Assign(opId uint64, nodeId uint64, db string, ptId uint32, ver uint64, durationInfos map[uint64]*meta2.ShardDurationInfo, dbBriefInfo *meta2.DatabaseBriefInfo, client metaclient.MetaClient, storage StorageService) error

func (*EngineImpl) ChangeShardTierToWarm added in v1.5.0

func (e *EngineImpl) ChangeShardTierToWarm(db string, ptId uint32, shardID uint64) error

func (*EngineImpl) CheckPtsRemovedDone added in v1.5.0

func (e *EngineImpl) CheckPtsRemovedDone() bool

func (*EngineImpl) ClearIndexCache added in v1.5.0

func (e *EngineImpl) ClearIndexCache(db string, ptId uint32, indexID uint64) error

func (*EngineImpl) ClearIndexRepCold added in v1.5.0

func (e *EngineImpl) ClearIndexRepCold(database string, rp string, ptId uint32, index *msgservice_data.IndexPair) error

func (*EngineImpl) ClearRepCold added in v1.5.0

func (e *EngineImpl) ClearRepCold(req *msgservice.SendClearEventsRequest) error

func (*EngineImpl) ClearShardRepCold added in v1.5.0

func (e *EngineImpl) ClearShardRepCold(database string, rp string, ptId uint32, shardPair *msgservice_data.ShardPair) error

func (*EngineImpl) Close added in v1.5.0

func (e *EngineImpl) Close() error

func (*EngineImpl) CreateConsumeIterator added in v1.5.0

func (e *EngineImpl) CreateConsumeIterator(db, mst string, opt *query.ProcessorOptions) []record.Iterator

func (*EngineImpl) CreateDBPT added in v1.5.0

func (e *EngineImpl) CreateDBPT(db string, pt uint32, enableTagArray bool)

func (*EngineImpl) CreateDDLBasePlans added in v1.5.0

func (e *EngineImpl) CreateDDLBasePlans(planType hybridqp.DDLType, db string, ptIDs []uint32, tr *influxql.TimeRange) DDLBasePlans

func (*EngineImpl) CreateLogicalPlan added in v1.5.0

func (e *EngineImpl) CreateLogicalPlan(ctx context.Context, db string, ptId uint32, shardID []uint64,
	sources influxql.Sources, schema *executor.QuerySchema) (hybridqp.QueryNode, error)

func (*EngineImpl) CreateLogicalPlanCrossShard added in v1.5.0

func (e *EngineImpl) CreateLogicalPlanCrossShard(ctx context.Context, db string, ptId uint32, shardID []uint64,
	sources influxql.Sources, schema *executor.QuerySchema) (hybridqp.QueryNode, error)

func (*EngineImpl) CreateLogicalPlanOneShard added in v1.5.0

func (e *EngineImpl) CreateLogicalPlanOneShard(ctx context.Context, db string, ptId uint32, shardID []uint64,
	sources influxql.Sources, schema *executor.QuerySchema) (hybridqp.QueryNode, error)

func (*EngineImpl) CreateShard added in v1.5.0

func (e *EngineImpl) CreateShard(db, rp string, ptId uint32, shardID uint64, timeRangeInfo *meta2.ShardTimeRangeInfo, mstInfo *meta2.MeasurementInfo) error

func (*EngineImpl) Databases added in v1.5.0

func (e *EngineImpl) Databases() []string

func (*EngineImpl) DbPTRef added in v1.5.0

func (e *EngineImpl) DbPTRef(db string, ptId uint32) error

func (*EngineImpl) DbPTUnref added in v1.5.0

func (e *EngineImpl) DbPTUnref(db string, ptId uint32)

func (*EngineImpl) DeleteDatabase added in v1.5.0

func (e *EngineImpl) DeleteDatabase(db string, ptId uint32) (err error)

func (*EngineImpl) DeleteIndex added in v1.5.0

func (e *EngineImpl) DeleteIndex(db string, ptId uint32, indexID uint64) error

func (*EngineImpl) DeleteMstInIndex added in v1.5.0

func (e *EngineImpl) DeleteMstInIndex(db string, ptId uint32, indexID uint64, msts []string, onlyUseDiskThreshold uint64) error

func (*EngineImpl) DeleteMstInShard added in v1.5.0

func (e *EngineImpl) DeleteMstInShard(db string, ptId uint32, shardID uint64, mst string) error

func (*EngineImpl) DeleteShard added in v1.5.0

func (e *EngineImpl) DeleteShard(db string, ptId uint32, shardID uint64) error

todo:need confirm

func (*EngineImpl) DropMeasurement added in v1.5.0

func (e *EngineImpl) DropMeasurement(db string, rp string, name string, shardIds []uint64) error

func (*EngineImpl) DropRetentionPolicy added in v1.5.0

func (e *EngineImpl) DropRetentionPolicy(db string, rp string, ptId uint32) error

func (*EngineImpl) DropSeries added in v1.5.0

func (e *EngineImpl) DropSeries() error

func (*EngineImpl) ExpiredCacheIndexes added in v1.5.0

func (e *EngineImpl) ExpiredCacheIndexes() []*meta2.IndexIdentifier

func (*EngineImpl) ExpiredIndexes added in v1.5.0

func (e *EngineImpl) ExpiredIndexes(nilIndexMap *map[uint64]*meta2.IndexDurationInfo) []*meta2.IndexIdentifier

func (*EngineImpl) ExpiredIndexesForMst added in v1.5.0

func (e *EngineImpl) ExpiredIndexesForMst(db, rp string, mst *meta2.MeasurementTTLTnfo) []*meta2.IndexIdentifier

func (*EngineImpl) ExpiredShards added in v1.5.0

func (e *EngineImpl) ExpiredShards(nilShardMap *map[uint64]*meta2.ShardDurationInfo) []*meta2.ShardIdentifier

func (*EngineImpl) ExpiredShardsForMst added in v1.5.0

func (e *EngineImpl) ExpiredShardsForMst(db, rp string, mst *meta2.MeasurementTTLTnfo) []*meta2.ShardIdentifier

func (*EngineImpl) FetchIndexesNeedChangeStore added in v1.5.0

func (e *EngineImpl) FetchIndexesNeedChangeStore() (indexesToCold []*meta2.IndexIdentifier)

func (*EngineImpl) FetchShardsNeedChangeStore added in v1.5.0

func (e *EngineImpl) FetchShardsNeedChangeStore() (shardsToWarm, shardsToCold []*meta2.ShardIdentifier)

func (*EngineImpl) ForceFlush added in v1.5.0

func (e *EngineImpl) ForceFlush()

func (*EngineImpl) GetDBPtIds added in v1.5.0

func (e *EngineImpl) GetDBPtIds() map[string][]uint32

func (*EngineImpl) GetDatabase added in v1.5.0

func (e *EngineImpl) GetDatabase(database string) map[uint32]*DBPTInfo

func (*EngineImpl) GetDownSamplePolicy added in v1.5.0

func (e *EngineImpl) GetDownSamplePolicy(key string) *meta2.StoreDownSamplePolicy

func (*EngineImpl) GetIndexInfo added in v1.5.0

func (e *EngineImpl) GetIndexInfo(db string, ptId uint32, shardID uint64, schema *executor.QuerySchema) (*executor.AttachedIndexInfo, error)

func (*EngineImpl) GetLastIndex added in v1.5.0

func (e *EngineImpl) GetLastIndex(db string, ptId uint32) (uint64, error)

func (*EngineImpl) GetLockFile added in v1.5.0

func (e *EngineImpl) GetLockFile() string

func (*EngineImpl) GetShard added in v1.5.0

func (e *EngineImpl) GetShard(db string, ptId uint32, shardID uint64) (Shard, error)

func (*EngineImpl) GetShardAndDbPt added in v1.5.0

func (e *EngineImpl) GetShardAndDbPt(db string, ptId uint32, shardID uint64) (Shard, *DBPTInfo, error)

getShard return Shard for write api

func (*EngineImpl) GetShardDownSampleLevel added in v1.5.0

func (e *EngineImpl) GetShardDownSampleLevel(db string, ptId uint32, shardID uint64) int

func (*EngineImpl) GetShardDownSamplePolicyInfos added in v1.5.0

func (e *EngineImpl) GetShardDownSamplePolicyInfos(meta MetaDownSample) ([]*meta2.ShardDownSamplePolicyInfo, error)

func (*EngineImpl) GetShardSplitPoints added in v1.5.0

func (e *EngineImpl) GetShardSplitPoints(db string, ptId uint32, shardID uint64, idxes []int64) ([]string, error)

func (*EngineImpl) HierarchicalStorage added in v1.5.0

func (e *EngineImpl) HierarchicalStorage(db string, ptId uint32, shardID uint64) bool

func (*EngineImpl) IndexHierarchicalStorage added in v1.5.0

func (e *EngineImpl) IndexHierarchicalStorage(db string, ptId uint32, indexID uint64) bool

func (*EngineImpl) InitLogStoreCtx added in v1.5.0

func (e *EngineImpl) InitLogStoreCtx(querySchema *executor.QuerySchema) (*idKeyCursorContext, error)

func (*EngineImpl) LogicalPlanCost added in v1.5.0

func (e *EngineImpl) LogicalPlanCost(db string, ptId uint32, sources influxql.Sources, opt query.ProcessorOptions) (hybridqp.LogicalPlanCost, error)

func (*EngineImpl) MarkDropSeries added in v1.5.0

func (e *EngineImpl) MarkDropSeries(db string, ptID uint32, mstName []byte, expr influxql.Expr, t tsi.TimeRange) error

func (*EngineImpl) MergeShards added in v1.5.0

func (e *EngineImpl) MergeShards(mergeShards meta2.MergeShards) error

for a mergeShards list

func (*EngineImpl) Offload added in v1.5.0

func (e *EngineImpl) Offload(opId uint64, db string, ptId uint32) error

func (*EngineImpl) Open added in v1.5.0

func (e *EngineImpl) Open(durationInfos map[uint64]*meta2.ShardDurationInfo, dbBriefInfos map[string]*meta2.DatabaseBriefInfo, m meta.MetaClient) error

func (*EngineImpl) OpenShardLazy added in v1.5.0

func (e *EngineImpl) OpenShardLazy(sh Shard) error

func (*EngineImpl) PreAssign added in v1.5.0

func (e *EngineImpl) PreAssign(opId uint64, db string, ptId uint32, durationInfos map[uint64]*meta2.ShardDurationInfo, dbBriefInfo *meta2.DatabaseBriefInfo, client metaclient.MetaClient) error

func (*EngineImpl) PreOffload added in v1.5.0

func (e *EngineImpl) PreOffload(opId uint64, db string, ptId uint32) error

func (*EngineImpl) RegisterOnPTLoaded added in v1.5.0

func (e *EngineImpl) RegisterOnPTLoaded(id uint64, f PtLoadFunc)

func (*EngineImpl) RegisterOnPTOffload added in v1.5.0

func (e *EngineImpl) RegisterOnPTOffload(id uint64, f func(ptID uint32))

func (*EngineImpl) ReplaceShards added in v1.5.0

func (e *EngineImpl) ReplaceShards(newShard Shard, shards []Shard, mergeShardPath string, dbpt *DBPTInfo, mergeShards meta2.MergeShards) error

func (*EngineImpl) RollbackPreOffload added in v1.5.0

func (e *EngineImpl) RollbackPreOffload(opId uint64, db string, ptId uint32) error

func (*EngineImpl) RowCount added in v1.5.0

func (e *EngineImpl) RowCount(db string, ptId uint32, shardIDs []uint64, schema *executor.QuerySchema) (int64, error)

func (*EngineImpl) ScanWithSparseIndex added in v1.5.0

func (e *EngineImpl) ScanWithSparseIndex(ctx context.Context, db string, ptId uint32, shardIDs []uint64, schema *executor.QuerySchema) (executor.ShardsFragments, error)

func (*EngineImpl) SendRaftMessage added in v1.5.0

func (e *EngineImpl) SendRaftMessage(database string, ptId uint64, msgs raftpb.Message) error

func (*EngineImpl) SeriesCardinality added in v1.5.0

func (e *EngineImpl) SeriesCardinality(db string, ptIDs []uint32, namesWithVer [][]byte, condition influxql.Expr, tr influxql.TimeRange) ([]meta2.MeasurementCardinalityInfo, error)

func (*EngineImpl) SeriesExactCardinality added in v1.5.0

func (e *EngineImpl) SeriesExactCardinality(db string, ptIDs []uint32, measurements [][]byte, condition influxql.Expr, tr influxql.TimeRange) (map[string]uint64, error)

func (*EngineImpl) SeriesKeys added in v1.5.0

func (e *EngineImpl) SeriesKeys(db string, ptIDs []uint32, measurements [][]byte, condition influxql.Expr, tr influxql.TimeRange) ([]string, error)

func (*EngineImpl) SetMetaClient added in v1.5.0

func (e *EngineImpl) SetMetaClient(m meta.MetaClient)

func (*EngineImpl) StartDownSampleTask added in v1.5.0

func (e *EngineImpl) StartDownSampleTask(sdsp *meta2.ShardDownSamplePolicyInfo, schema []hybridqp.Catalog, log *zap.Logger,
	meta MetaDownSample) error

func (*EngineImpl) Statistics added in v1.5.0

func (e *EngineImpl) Statistics(buffer []byte) ([]byte, error)

func (*EngineImpl) StatisticsOps added in v1.5.0

func (e *EngineImpl) StatisticsOps() []opsStat.OpsStatistic

func (*EngineImpl) SysCtrl added in v1.5.0

func (e *EngineImpl) SysCtrl(req *msgservice.SysCtrlRequest) (map[string]string, error)

func (*EngineImpl) TagKeys added in v1.5.0

func (e *EngineImpl) TagKeys(db string, ptIDs []uint32, measurements [][]byte, condition influxql.Expr, tr influxql.TimeRange) ([]string, error)

func (*EngineImpl) TagValues added in v1.5.0

func (e *EngineImpl) TagValues(db string, ptIDs []uint32, tagKeys map[string][][]byte, condition influxql.Expr, tr influxql.TimeRange) (influxql.TablesTagSets, error)

func (*EngineImpl) TagValuesCardinality added in v1.5.0

func (e *EngineImpl) TagValuesCardinality(db string, ptIDs []uint32, tagKeys map[string][][]byte, condition influxql.Expr, tr influxql.TimeRange) (map[string]uint64, error)

func (*EngineImpl) TransferLeadership added in v1.5.0

func (e *EngineImpl) TransferLeadership(database string, nodeId uint64, oldMasterPtId, newMasterPtId uint32) error

func (*EngineImpl) UninstallOnPTLoaded added in v1.5.0

func (e *EngineImpl) UninstallOnPTLoaded(id uint64)

func (*EngineImpl) UninstallOnPTOffload added in v1.5.0

func (e *EngineImpl) UninstallOnPTOffload(id uint64)

func (*EngineImpl) UpdateDownSampleInfo added in v1.5.0

func (e *EngineImpl) UpdateDownSampleInfo(policies *meta2.DownSamplePoliciesInfoWithDbRp)

func (*EngineImpl) UpdateIndexDurationInfo added in v1.5.0

func (e *EngineImpl) UpdateIndexDurationInfo(info *meta2.IndexDurationInfo, nilIndexMap *map[uint64]*meta2.IndexDurationInfo) error

func (*EngineImpl) UpdateShardDownSampleInfo added in v1.5.0

func (e *EngineImpl) UpdateShardDownSampleInfo(infos *meta2.ShardDownSampleUpdateInfos)

func (*EngineImpl) UpdateShardDurationInfo added in v1.5.0

func (e *EngineImpl) UpdateShardDurationInfo(info *meta2.ShardDurationInfo, nilShardMap *map[uint64]*meta2.ShardDurationInfo) error

func (*EngineImpl) UpdateStoreDownSamplePolicies added in v1.5.0

func (e *EngineImpl) UpdateStoreDownSamplePolicies(info *meta2.DownSamplePolicyInfo, ident string)

func (*EngineImpl) WriteBlobs added in v1.5.0

func (e *EngineImpl) WriteBlobs(db string, ptId uint32, shardID uint64, group *shelf.BlobGroup) error

func (*EngineImpl) WriteRec added in v1.5.0

func (e *EngineImpl) WriteRec(db, mst string, ptId uint32, shardID uint64, rec *record.Record, binaryRec []byte) error

WriteRec only works for column store

func (*EngineImpl) WriteRows added in v1.5.0

func (e *EngineImpl) WriteRows(db, rp string, ptId uint32, shardID uint64, rows []influx.Row, binaryRows []byte, snp *raftlog.SnapShotter) error

func (*EngineImpl) WriteToRaft added in v1.5.0

func (e *EngineImpl) WriteToRaft(db, rp string, ptId uint32, tail []byte) error

type EngineOptions added in v1.5.0

type EngineOptions struct {
	Version string
	// Limits the concurrent number of TSM files that can be loaded at once.
	OpenLimiter limiter.Fixed

	// WriteColdDuration is the length of time at which the engine will snapshot the mutable
	WriteColdDuration time.Duration

	// ForceSnapShotDuration is the length of time at which the engine must snapshot the mutable
	ForceSnapShotDuration time.Duration

	// ShardMutableSizeLimit is the maximum size a shard's cache can reach before it starts rejecting writes.
	ShardMutableSizeLimit int64

	// NodeMutableSizeLimit is the maximum size a node's cache can reach before it starts rejecting writes.
	NodeMutableSizeLimit int64

	// MaxWriteHangTime is the maximum time to hang for data write to store if node mem of mem is not enough
	MaxWriteHangTime time.Duration

	// Enable read from mem data, include mutable and snapshot table, default value is true
	MemDataReadEnabled bool

	FullCompactColdDuration      time.Duration
	MaxConcurrentCompactions     int
	MaxFullCompactions           int
	CompactThroughput            int64
	CompactThroughputBurst       int64
	CompactRecovery              bool
	CsDetachedFlushEnabled       bool
	ShardMoveLayoutSwitchEnabled bool
	SnapshotThroughput           int64
	SnapshotThroughputBurst      int64
	SnapshotTblNum               int
	FragmentsNumPerFlush         int
	BackgroundReadThroughput     int

	// WalSyncInterval is the interval of wal file sync
	WalEnabled         bool
	WalSyncInterval    time.Duration
	WalReplayParallel  bool
	WalReplayAsync     bool
	WalReplayBatchSize int

	// Immutable config
	ReadPageSize       string
	ReadMetaPageSize   []string
	ReadMetaCacheLimit uint64
	ReadDataCacheLimit uint64
	EnableMmapRead     bool
	CompactionMethod   int // 0:auto, 1:stream, 2: non-stream

	OpenShardLimit int
	// lazy load shards
	LazyLoadShardEnable       bool
	ThermalShardStartDuration time.Duration
	ThermalShardEndDuration   time.Duration

	DownSampleWriteDrop          bool
	MaxDownSampleTaskConcurrency int

	MaxSeriesPerDatabase int
	MaxRowsPerSegment    int

	// for hierarchical storage
	SkipRegisterColdShard bool

	// for raftNode
	RaftEntrySyncInterval time.Duration
}

func NewEngineOptions added in v1.5.0

func NewEngineOptions() EngineOptions

type FieldIter added in v1.0.0

type FieldIter struct {
	// contains filtered or unexported fields
}

func NewFieldIter added in v1.0.0

func NewFieldIter(querySchema *executor.QuerySchema) *FieldIter

func (*FieldIter) GetRecordSchemas added in v1.0.0

func (r *FieldIter) GetRecordSchemas() record.Schemas

func (*FieldIter) ResetPos added in v1.0.0

func (r *FieldIter) ResetPos()

type FileSequenceAggregator added in v1.0.0

type FileSequenceAggregator struct {
	executor.BaseProcessor

	Input  *executor.SeriesRecordPort
	Output *executor.SeriesRecordPort
	// contains filtered or unexported fields
}

func (*FileSequenceAggregator) Aggregate added in v1.0.0

func (r *FileSequenceAggregator) Aggregate()

func (*FileSequenceAggregator) AggregateSameSchema added in v1.0.0

func (r *FileSequenceAggregator) AggregateSameSchema() error

func (*FileSequenceAggregator) Close added in v1.0.0

func (r *FileSequenceAggregator) Close()

func (*FileSequenceAggregator) Create added in v1.0.0

func (*FileSequenceAggregator) Explain added in v1.0.0

func (r *FileSequenceAggregator) Explain() []executor.ValuePair

func (*FileSequenceAggregator) GetInputNumber added in v1.0.0

func (r *FileSequenceAggregator) GetInputNumber(executor.Port) int

func (*FileSequenceAggregator) GetInputs added in v1.0.0

func (r *FileSequenceAggregator) GetInputs() executor.Ports

func (*FileSequenceAggregator) GetOutputNumber added in v1.0.0

func (r *FileSequenceAggregator) GetOutputNumber(executor.Port) int

func (*FileSequenceAggregator) GetOutputs added in v1.0.0

func (r *FileSequenceAggregator) GetOutputs() executor.Ports

func (*FileSequenceAggregator) GetProcessors added in v1.0.0

func (r *FileSequenceAggregator) GetProcessors()

func (*FileSequenceAggregator) IsSink added in v1.0.0

func (r *FileSequenceAggregator) IsSink() bool

func (*FileSequenceAggregator) Name added in v1.0.0

func (r *FileSequenceAggregator) Name() string

func (*FileSequenceAggregator) Release added in v1.0.0

func (r *FileSequenceAggregator) Release() error

func (*FileSequenceAggregator) SendRecord added in v1.0.0

func (r *FileSequenceAggregator) SendRecord(re *record.Record)

func (*FileSequenceAggregator) Work added in v1.0.0

type FilterCursor added in v1.5.0

type FilterCursor struct {
	// contains filtered or unexported fields
}

func NewFilterCursor added in v1.5.0

func NewFilterCursor(schema *executor.QuerySchema, tagSets []tsi.TagSet, itr comm.KeyCursor, schemas record.Schemas,
	filterOption immutable.BaseFilterOptions) (*FilterCursor, error)

func (*FilterCursor) Close added in v1.5.0

func (fc *FilterCursor) Close() error

func (*FilterCursor) EndSpan added in v1.5.0

func (fc *FilterCursor) EndSpan()

func (*FilterCursor) GetSchema added in v1.5.0

func (fc *FilterCursor) GetSchema() record.Schemas

func (*FilterCursor) Name added in v1.5.0

func (fc *FilterCursor) Name() string

func (*FilterCursor) Next added in v1.5.0

func (*FilterCursor) NextAggData added in v1.5.0

func (fc *FilterCursor) NextAggData() (*record.Record, *comm.FileInfo, error)

func (*FilterCursor) SetOps added in v1.5.0

func (fc *FilterCursor) SetOps(ops []*comm.CallOption)

func (*FilterCursor) SinkPlan added in v1.5.0

func (fc *FilterCursor) SinkPlan(plan hybridqp.QueryNode)

func (*FilterCursor) StartSpan added in v1.5.0

func (fc *FilterCursor) StartSpan(span *tracing.Span)

type FloatCountPromOp added in v1.3.0

type FloatCountPromOp struct {
	BasePromOp
}

func (*FloatCountPromOp) CreateRoutine added in v1.3.0

func (c *FloatCountPromOp) CreateRoutine(p *AggParams) (Routine, error)

type FloatHeapItem

type FloatHeapItem struct {
	// contains filtered or unexported fields
}

func NewFloatHeapItem

func NewFloatHeapItem(n int, cmpByValue, cmpByTime func(a, b *FloatPointItem) bool) *FloatHeapItem

func (*FloatHeapItem) Len

func (f *FloatHeapItem) Len() int

func (*FloatHeapItem) Less

func (f *FloatHeapItem) Less(i, j int) bool

func (*FloatHeapItem) Pop

func (f *FloatHeapItem) Pop() interface{}

func (*FloatHeapItem) Push

func (f *FloatHeapItem) Push(x interface{})

func (*FloatHeapItem) Reset

func (f *FloatHeapItem) Reset()

func (*FloatHeapItem) Swap

func (f *FloatHeapItem) Swap(i, j int)

type FloatIncAggMergeFunc added in v1.3.0

type FloatIncAggMergeFunc func(prevValue float64, currValue float64, prevCount, currCount int) (float64, int)

FloatIncAggMergeFunc is used to calculate the final result based on the intermediate result and function definition.

type FloatIncAggReduceFunc added in v1.3.0

type FloatIncAggReduceFunc func(times []int64, values []float64, start, end int) (int64, float64, bool)

incAgg Reducer FloatIncAggReduceFunc is used to process intermediate calculation results.

type FloatPointItem

type FloatPointItem struct {
	// contains filtered or unexported fields
}

func NewFloatPointItem

func NewFloatPointItem(time int64, value float64) *FloatPointItem

type FloatRateMergeFunc added in v1.3.0

type FloatRateMergeFunc func(prevTime int64, currTime int64, prevValue float64, currValue float64, ts int64, pointCount int, param *ReducerParams) (float64, bool)

FloatRateMergeFunc is used to calculate the final result based on the intermediate result and function definition.

type FloatRateReduceFunc added in v1.3.0

type FloatRateReduceFunc func(times []int64, values []float64, start, end int) (int64, int64, float64, float64, bool)

rate reducer FloatRateReduceFunc is used to process intermediate calculation results. eg, calculates the first and last time points for rate.

type FloatRateUpdateFunc added in v1.3.0

type FloatRateUpdateFunc func(ft1, ft2, lt1, lt2 int64, fv1, fv2, lv1, lv2 float64) (int64, int64, float64, float64)

FloatRateUpdateFunc is used to exchange intermediate calculation results.

type FloatSliceMergeFunc added in v1.3.0

type FloatSliceMergeFunc func(prevT, currT []int64, prevV, currV []float64, ts int64, count int, param *ReducerParams) (float64, bool)

type FloatSliceReduceFunc added in v1.3.0

type FloatSliceReduceFunc func(times []int64, values []float64, start, end int) ([]int64, []float64, bool)

slice reducer

type HybridStoreReader added in v1.2.0

type HybridStoreReader struct {
	executor.BaseProcessor
	// contains filtered or unexported fields
}

func NewHybridStoreReader added in v1.2.0

func NewHybridStoreReader(plan hybridqp.QueryNode, indexInfo *executor.CSIndexInfo) *HybridStoreReader

func (*HybridStoreReader) Abort added in v1.2.0

func (r *HybridStoreReader) Abort()

func (*HybridStoreReader) Close added in v1.2.0

func (r *HybridStoreReader) Close()

func (*HybridStoreReader) CreateCursors added in v1.2.0

func (r *HybridStoreReader) CreateCursors() error

func (*HybridStoreReader) Explain added in v1.2.0

func (r *HybridStoreReader) Explain() []executor.ValuePair

func (*HybridStoreReader) GetInputNumber added in v1.2.0

func (r *HybridStoreReader) GetInputNumber(_ executor.Port) int

func (*HybridStoreReader) GetInputs added in v1.2.0

func (r *HybridStoreReader) GetInputs() executor.Ports

func (*HybridStoreReader) GetOutputNumber added in v1.2.0

func (r *HybridStoreReader) GetOutputNumber(_ executor.Port) int

func (*HybridStoreReader) GetOutputs added in v1.2.0

func (r *HybridStoreReader) GetOutputs() executor.Ports

func (*HybridStoreReader) IsSink added in v1.2.0

func (r *HybridStoreReader) IsSink() bool

func (*HybridStoreReader) Name added in v1.2.0

func (r *HybridStoreReader) Name() string

func (*HybridStoreReader) Release added in v1.2.0

func (r *HybridStoreReader) Release() error

func (*HybridStoreReader) Work added in v1.2.0

func (r *HybridStoreReader) Work(ctx context.Context) (err error)

type IndexReader added in v1.2.0

type IndexReader interface {
	CreateCursors() ([]comm.KeyCursor, int, error)
	StartSpan(span *tracing.Span)
}

type InstantVectorCursor added in v1.3.0

type InstantVectorCursor struct {
	// contains filtered or unexported fields
}

InstantVectorCursor is used to sample and process data for prom instant_query or range_query. Sampling is the first step of original data processing for non-aggregated query and aggregated query. For non-aggregated query, place InstantVectorCursor after seriesCursor. For aggregated query, replace aggregateCursor with the InstantVectorCursor. Give an example: data: (time, value)=> [(1, 1.0), (2, 2.0), (3, 3.0), (4, 4.0), (6, 6.0)] start=1, end=6, offset=0, step=2, LookUpDelta=3 => startSample=1, endSample=5 sample data: (time, value)=> [(1, 1.0), (3, 3.0), (5, 4.0)]

func NewInstantVectorCursor added in v1.3.0

func NewInstantVectorCursor(input comm.KeyCursor, schema *executor.QuerySchema, globalPool *record.RecordPool, tr util.TimeRange) *InstantVectorCursor

func (*InstantVectorCursor) Close added in v1.3.0

func (c *InstantVectorCursor) Close() error

func (*InstantVectorCursor) EndSpan added in v1.3.0

func (c *InstantVectorCursor) EndSpan()

func (*InstantVectorCursor) GetSchema added in v1.3.0

func (c *InstantVectorCursor) GetSchema() record.Schemas

func (*InstantVectorCursor) Name added in v1.3.0

func (c *InstantVectorCursor) Name() string

func (*InstantVectorCursor) Next added in v1.3.0

func (c *InstantVectorCursor) Next() (*record.Record, comm.SeriesInfoIntf, error)

func (*InstantVectorCursor) NextAggData added in v1.3.0

func (c *InstantVectorCursor) NextAggData() (*record.Record, *comm.FileInfo, error)

func (*InstantVectorCursor) SetOps added in v1.3.0

func (c *InstantVectorCursor) SetOps(ops []*comm.CallOption)

func (*InstantVectorCursor) SetSchema added in v1.3.0

func (c *InstantVectorCursor) SetSchema(inSchema, outSchema record.Schemas, exprOpt []hybridqp.ExprOptions)

func (*InstantVectorCursor) SinkPlan added in v1.3.0

func (c *InstantVectorCursor) SinkPlan(plan hybridqp.QueryNode)

func (*InstantVectorCursor) StartSpan added in v1.3.0

func (c *InstantVectorCursor) StartSpan(span *tracing.Span)

type IntegerHeapItem

type IntegerHeapItem struct {
	// contains filtered or unexported fields
}

func NewIntegerHeapItem

func NewIntegerHeapItem(n int, cmpByValue, cmpByTime func(a, b *IntegerPointItem) bool) *IntegerHeapItem

func (*IntegerHeapItem) Len

func (f *IntegerHeapItem) Len() int

func (*IntegerHeapItem) Less

func (f *IntegerHeapItem) Less(i, j int) bool

func (*IntegerHeapItem) Pop

func (f *IntegerHeapItem) Pop() interface{}

func (*IntegerHeapItem) Push

func (f *IntegerHeapItem) Push(x interface{})

func (*IntegerHeapItem) Reset

func (f *IntegerHeapItem) Reset()

func (*IntegerHeapItem) Swap

func (f *IntegerHeapItem) Swap(i, j int)

type IntegerPointItem

type IntegerPointItem struct {
	// contains filtered or unexported fields
}

func NewIntegerPointItem

func NewIntegerPointItem(time int64, value int64) *IntegerPointItem

type LogReplay added in v1.0.1

type LogReplay struct {
	// contains filtered or unexported fields
}

type LogReplays added in v1.0.1

type LogReplays []LogReplay

type LogWriter

type LogWriter struct {
	SyncInterval time.Duration
	// contains filtered or unexported fields
}

func (*LogWriter) Switch

func (w *LogWriter) Switch() ([]string, error)

func (*LogWriter) Write

func (w *LogWriter) Write(compBuf []byte) error

type LogWriters added in v1.0.1

type LogWriters []LogWriter

type MaxPromOp added in v1.3.0

type MaxPromOp struct {
	BasePromOp
}

func (*MaxPromOp) CreateRoutine added in v1.3.0

func (c *MaxPromOp) CreateRoutine(p *AggParams) (Routine, error)

type MemDataReader added in v1.4.1

type MemDataReader interface {
	Ref()
	UnRef()
	Values(msName string, id uint64, tr util.TimeRange, schema record.Schemas, ascending bool) *record.Record
}

type MetaDownSample added in v1.5.0

type MetaDownSample interface {
	UpdateShardDownSampleInfo(Ident *meta.ShardIdentifier) error
}

type MetaIndexIterator added in v1.0.0

type MetaIndexIterator struct {
	// contains filtered or unexported fields
}

func NewMetaIndexIterators added in v1.0.0

func NewMetaIndexIterators(file immutable.TSSPFile, querySchema *executor.QuerySchema) (*MetaIndexIterator, error)

type MinPromOp added in v1.3.0

type MinPromOp struct {
	BasePromOp
}

func (*MinPromOp) CreateRoutine added in v1.3.0

func (c *MinPromOp) CreateRoutine(p *AggParams) (Routine, error)

type NewEngineFun added in v1.5.0

type NewEngineFun func(dataPath, walPath string, options EngineOptions, ctx *metaclient.LoadCtx) (Engine, error)

func GetNewEngineFunction added in v1.5.0

func GetNewEngineFunction(entType string) NewEngineFun

type NodeInfo added in v1.3.0

type NodeInfo struct {
	// contains filtered or unexported fields
}

func (*NodeInfo) GenShardDirName added in v1.3.0

func (n *NodeInfo) GenShardDirName()

type PreAggTagSetCursor

type PreAggTagSetCursor struct {
	// contains filtered or unexported fields
}

func NewPreAggTagSetCursor

func NewPreAggTagSetCursor(schema *executor.QuerySchema, ctx *idKeyCursorContext, itr comm.KeyCursor) *PreAggTagSetCursor

func (*PreAggTagSetCursor) Close

func (s *PreAggTagSetCursor) Close() error

func (*PreAggTagSetCursor) EndSpan

func (s *PreAggTagSetCursor) EndSpan()

func (*PreAggTagSetCursor) GetSchema

func (s *PreAggTagSetCursor) GetSchema() record.Schemas

func (*PreAggTagSetCursor) Name

func (s *PreAggTagSetCursor) Name() string

func (*PreAggTagSetCursor) Next

func (*PreAggTagSetCursor) NextAggData

func (s *PreAggTagSetCursor) NextAggData() (*record.Record, *comm.FileInfo, error)

func (*PreAggTagSetCursor) RecordInitPreAgg

func (s *PreAggTagSetCursor) RecordInitPreAgg() error

func (*PreAggTagSetCursor) SetOps

func (s *PreAggTagSetCursor) SetOps(ops []*comm.CallOption)

func (*PreAggTagSetCursor) SetSchema

func (s *PreAggTagSetCursor) SetSchema(schema record.Schemas)

func (*PreAggTagSetCursor) SinkPlan

func (s *PreAggTagSetCursor) SinkPlan(plan hybridqp.QueryNode)

func (*PreAggTagSetCursor) StartSpan

func (s *PreAggTagSetCursor) StartSpan(span *tracing.Span)

type PromFuncParam added in v1.3.0

type PromFuncParam struct {
	// contains filtered or unexported fields
}

type PromFunction added in v1.3.0

type PromFunction interface {
	CreateRoutine(param *PromFuncParam) (Routine, error)
}

func GetPromFunction added in v1.3.0

func GetPromFunction(name string) PromFunction

type PromFunctionFactory added in v1.3.0

type PromFunctionFactory map[string]PromFunction

type PtLoadFunc added in v1.5.0

type PtLoadFunc func(db string, ptID uint32)

type PtNNLock

type PtNNLock struct {
}

type RaftMessage added in v1.5.0

type RaftMessage interface {
	SendRaftMessage(database string, ptId uint64, msg raftpb.Message) error
}

type RangeVectorCursor added in v1.3.0

type RangeVectorCursor struct {
	// contains filtered or unexported fields
}

RangeVectorCursor is used to process the calculation of the function with range duration for prom instant_query or range_query. This is a kind of sliding window calculation. For aggregated query, replace aggregateCursor with the RangeVectorCursor. Give an example: avg_over_time(value[3]) start=1, end=6, step=2 rangDuration=3, startSample=1, endSample=5 original data: (time, value)=> [(1, 1.0), (2, 2.0), (3, 3.0), (4, 4.0), (6, 6.0)] interval index: [start, end) => [[0, 1), [0,3), [1, 4)] grouped data: (time, value)=> [[(1, 1.0)], [(1, 1.0), (2, 2.0), (3, 3.0)], [(2, 2.0), (3, 3.0), (4, 4.0)]] aggregated data: (time, value)=> [ (1, 1.0), (3, 2.0), (5, 3.0)]

func NewRangeVectorCursor added in v1.3.0

func NewRangeVectorCursor(input comm.KeyCursor, schema *executor.QuerySchema, globalPool *record.RecordPool, tr util.TimeRange) *RangeVectorCursor

func (*RangeVectorCursor) Close added in v1.3.0

func (c *RangeVectorCursor) Close() error

func (*RangeVectorCursor) EndSpan added in v1.3.0

func (c *RangeVectorCursor) EndSpan()

func (*RangeVectorCursor) GetSchema added in v1.3.0

func (c *RangeVectorCursor) GetSchema() record.Schemas

func (*RangeVectorCursor) Name added in v1.3.0

func (c *RangeVectorCursor) Name() string

func (*RangeVectorCursor) Next added in v1.3.0

func (c *RangeVectorCursor) Next() (*record.Record, comm.SeriesInfoIntf, error)

func (*RangeVectorCursor) NextAggData added in v1.3.0

func (c *RangeVectorCursor) NextAggData() (*record.Record, *comm.FileInfo, error)

func (*RangeVectorCursor) SetOps added in v1.3.0

func (c *RangeVectorCursor) SetOps(ops []*comm.CallOption)

func (*RangeVectorCursor) SetSchema added in v1.3.0

func (c *RangeVectorCursor) SetSchema(inSchema, outSchema record.Schemas, exprOpt []hybridqp.ExprOptions)

func (*RangeVectorCursor) SinkPlan added in v1.3.0

func (c *RangeVectorCursor) SinkPlan(plan hybridqp.QueryNode)

func (*RangeVectorCursor) StartSpan added in v1.3.0

func (c *RangeVectorCursor) StartSpan(span *tracing.Span)

type Reducer

type Reducer interface {
	Aggregate(*ReducerEndpoint, *ReducerParams)
}

type ReducerEndpoint

type ReducerEndpoint struct {
	InputPoint  EndPointPair
	OutputPoint EndPointPair
}

type ReducerParams

type ReducerParams struct {
	// contains filtered or unexported fields
}

type ReplayCallFuncType added in v1.4.0

type ReplayCallFuncType func(binary []byte, rowsCtx *walRowsObjects, writeWalType WalRecordType, logReplay LogReplay) error

type Routine

type Routine interface {
	WorkOnRecord(*record.Record, *record.Record, *ReducerParams)
}

type RoutineImpl

type RoutineImpl struct {
	// contains filtered or unexported fields
}

func NewRoutineImpl

func NewRoutineImpl(reducer Reducer, inOrdinal int, outOrdinal int) *RoutineImpl

func (*RoutineImpl) WorkOnRecord

func (r *RoutineImpl) WorkOnRecord(in *record.Record, out *record.Record, params *ReducerParams)

type SeriesIter

type SeriesIter struct {
	// contains filtered or unexported fields
}

type Shard

type Shard interface {
	// IO interface
	WriteRows(rows []influx.Row, binaryRows []byte) error               // line protocol
	WriteCols(mst string, cols *record.Record, binaryCols []byte) error // native protocol
	WriteBlobs(group *shelf.BlobGroup) error
	ForceFlush()
	WaitWriteFinish()
	CreateLogicalPlan(ctx context.Context, sources influxql.Sources, schema *executor.QuerySchema) (hybridqp.QueryNode, error)
	CreateCursor(ctx context.Context, schema *executor.QuerySchema) (comm.TSIndexInfo, error)
	Scan(span *tracing.Span, schema *executor.QuerySchema, callBack func(num int64) error) (tsi.GroupSeries, int64, uint64, error)
	ScanWithInvertedIndex(span *tracing.Span, ctx context.Context, sources influxql.Sources, schema *executor.QuerySchema) (tsi.GroupSeries, int64, error)
	ScanWithSparseIndex(ctx context.Context, schema *executor.QuerySchema, callBack func(num int64) error) (*executor.FileFragments, error)
	GetIndexInfo(schema *executor.QuerySchema) (*executor.AttachedIndexInfo, error)
	RowCount(schema *executor.QuerySchema) (int64, error)
	NewShardKeyIdx(shardType, dataPath string, lockPath *string) error

	// admin
	OpenAndEnable(client metaclient.MetaClient) error
	FreeSequencer()
	IsOpened() bool
	Close() error
	ChangeShardTierToWarm()
	DropMeasurement(ctx context.Context, name string) error
	GetSplitPoints(idxes []int64) ([]string, error) // only work for tsstore (depends on sid)

	// get private member
	GetDataPath() string
	GetObsOption() *obs.ObsOptions
	GetWalPath() string
	GetDuration() *meta.DurationDescriptor
	GetEngineType() config.EngineType
	GetIdent() *meta.ShardIdentifier
	GetID() uint64
	GetRowCount() uint64
	GetRPName() string
	GetStatistics(buffer []byte) ([]byte, error)
	GetMaxTime() int64
	GetStartTime() time.Time
	Intersect(tr *influxql.TimeRange) bool
	GetIndexBuilder() *tsi.IndexBuilder                                // only work for tsstore(tsi)
	GetSeriesCount() int                                               // only work for tsstore
	GetTableStore() immutable.TablesStore                              // used by downsample and test
	GetTSSPFiles(mm string, isOrder bool) (*immutable.TSSPFiles, bool) // used by downsample and test
	GetTier() uint64
	IsExpired() bool
	IsTierExpired() bool
	GetEndTime() time.Time

	// downsample, only work for tsstore
	CanDoDownSample() bool
	DisableDownSample()
	EnableDownSample()
	GetShardDownSamplePolicy(policy *meta.DownSamplePolicyInfo) *meta.ShardDownSamplePolicyInfo
	IsOutOfOrderFilesExist() bool
	NewDownSampleTask(sdsp *meta.ShardDownSamplePolicyInfo, schema []hybridqp.Catalog, log *zap.Logger)
	SetShardDownSampleLevel(i int)
	SetObsOption(option *obs.ObsOptions)
	StartDownSample(taskID uint64, level int, sdsp *meta.ShardDownSamplePolicyInfo, meta MetaDownSample) error
	UpdateDownSampleOnShard(id uint64, level int)
	UpdateShardReadOnly(meta MetaDownSample) error

	// compaction && merge, only work for tsstore
	Compact() error
	DisableCompAndMerge()
	EnableCompAndMerge()
	SetLockPath(lock *string)
	IsColdShard() bool
	CanDoShardMove() bool
	UnregisterShard()
	ExecShardMove() error
	DisableHierarchicalStorage()
	SetEnableHierarchicalStorage()
	CreateDDLBasePlan(client metaclient.MetaClient, ddl hybridqp.DDLType) immutable.DDLBasePlan
	CreateConsumeIterator(mst string, opt *query.ProcessorOptions) record.Iterator

	// raft SnapShot
	SetSnapShotter(snp *raftlog.SnapShotter)

	MergeToDstShard(dstShardPath string, maxFileSeq uint64, lock fileops.FileLockOption) error
	GetFileSeq() uint64
	ReplaceByNoClearShardId(noClearShard uint64) (map[string][]immutable.TSSPFile, map[string][]immutable.TSSPFile, uint64, error)
	ClearOldTsspFiles(map[string][]immutable.TSSPFile, map[string][]immutable.TSSPFile) error
	RecoverTier(tier uint64)
}

type ShardMoveFileInfo added in v1.3.0

type ShardMoveFileInfo struct {
	Name       string   // mst names in this shard
	LocalFile  []string // local files involved in this shard move
	RemoteFile []string
}

type ShardStatus added in v1.1.0

type ShardStatus struct {
	ShardId  uint64
	Opened   bool
	ReadOnly bool
}

func (ShardStatus) MarshalText added in v1.1.0

func (s ShardStatus) MarshalText() (data []byte, err error)

MarshalText keeps marshaled dict items order

type Storage added in v1.1.0

type Storage interface {
	WriteCols(s *shard, cols *record.Record, mst string, binaryCols []byte) error // native protocol
	WriteIndex(idx *tsi.IndexBuilder, mw *mstWriteCtx) func() error

	SetAccumulateMetaIndex(name string, detachedMetaInfo *immutable.AccumulateMetaIndex)
	ForceFlush(s *shard)
	// contains filtered or unexported methods
}

type StorageService added in v1.5.0

type StorageService interface {
	Write(db, rp, mst string, ptId uint32, shardID uint64, writeData func() error) error
	WriteDataFunc(db, rp string, ptId uint32, shardID uint64, rows []influx.Row, binaryRows []byte, index *raftlog.SnapShotter) error
	GetNodeId() uint64
}

type StreamDetachedReader added in v1.3.0

type StreamDetachedReader struct {
	// contains filtered or unexported fields
}

StreamDetachedReader implement comm.KeyCursor and comm.TimeCutKeyCursor, it can stream read detached data to reduce IO of BF.

func NewStreamDetachedReader added in v1.3.0

func NewStreamDetachedReader(readerCtx *immutable.FileReaderContext, path *sparseindex.OBSFilterPath, ctx *indexContext) *StreamDetachedReader

func (*StreamDetachedReader) Close added in v1.3.0

func (t *StreamDetachedReader) Close() error

func (*StreamDetachedReader) EndSpan added in v1.3.0

func (t *StreamDetachedReader) EndSpan()

func (*StreamDetachedReader) GetSchema added in v1.3.0

func (t *StreamDetachedReader) GetSchema() record.Schemas

func (*StreamDetachedReader) Init added in v1.3.0

func (r *StreamDetachedReader) Init() (err error)

func (*StreamDetachedReader) Name added in v1.3.0

func (t *StreamDetachedReader) Name() string

func (*StreamDetachedReader) Next added in v1.3.0

func (*StreamDetachedReader) NextAggData added in v1.3.0

func (t *StreamDetachedReader) NextAggData() (*record.Record, *comm.FileInfo, error)

func (*StreamDetachedReader) SetOps added in v1.3.0

func (t *StreamDetachedReader) SetOps(ops []*comm.CallOption)

func (*StreamDetachedReader) SinkPlan added in v1.3.0

func (t *StreamDetachedReader) SinkPlan(plan hybridqp.QueryNode)

func (*StreamDetachedReader) StartSpan added in v1.3.0

func (t *StreamDetachedReader) StartSpan(span *tracing.Span)

func (*StreamDetachedReader) UpdateTime added in v1.3.0

func (t *StreamDetachedReader) UpdateTime(time int64)

type StreamHandler added in v1.4.0

type StreamHandler func(rows influx.Rows, isLastRows bool, fileNames []string) error

type StreamWalManager added in v1.4.0

type StreamWalManager struct {
	// contains filtered or unexported fields
}

func NewStreamWalManager added in v1.4.0

func NewStreamWalManager() *StreamWalManager

func (*StreamWalManager) Add added in v1.4.0

func (m *StreamWalManager) Add(files *WalFiles)

func (*StreamWalManager) CleanLoadFiles added in v1.4.0

func (m *StreamWalManager) CleanLoadFiles()

func (*StreamWalManager) Free added in v1.4.0

func (m *StreamWalManager) Free(tm int64)

func (*StreamWalManager) InitStreamHandler added in v1.4.0

func (m *StreamWalManager) InitStreamHandler(f StreamHandler)

func (*StreamWalManager) LastIndex added in v1.5.0

func (m *StreamWalManager) LastIndex(ptID uint32) int

func (*StreamWalManager) Load added in v1.4.0

func (m *StreamWalManager) Load(dir string, lock *string) error

func (*StreamWalManager) Replay added in v1.4.0

func (m *StreamWalManager) Replay(ctx context.Context, ptID uint32, isLastPT bool, hasData *bool) error

type TSIndexInfoImpl added in v1.4.0

type TSIndexInfoImpl struct {
	// contains filtered or unexported fields
}

func (*TSIndexInfoImpl) GetCursors added in v1.4.0

func (f *TSIndexInfoImpl) GetCursors() []comm.KeyCursor

func (*TSIndexInfoImpl) IsEmpty added in v1.4.0

func (f *TSIndexInfoImpl) IsEmpty() bool

func (*TSIndexInfoImpl) Ref added in v1.4.0

func (f *TSIndexInfoImpl) Ref()

func (*TSIndexInfoImpl) RefFiles added in v1.4.0

func (f *TSIndexInfoImpl) RefFiles()

func (*TSIndexInfoImpl) RefMemTables added in v1.4.0

func (f *TSIndexInfoImpl) RefMemTables()

func (*TSIndexInfoImpl) SetCursors added in v1.4.0

func (f *TSIndexInfoImpl) SetCursors(cursors []comm.KeyCursor)

func (*TSIndexInfoImpl) UnRefFiles added in v1.4.0

func (f *TSIndexInfoImpl) UnRefFiles()

func (*TSIndexInfoImpl) UnRefMemTables added in v1.4.0

func (f *TSIndexInfoImpl) UnRefMemTables()

func (*TSIndexInfoImpl) Unref added in v1.4.0

func (f *TSIndexInfoImpl) Unref()

type TagSetCursorItem

type TagSetCursorItem struct {
	// contains filtered or unexported fields
}

func (TagSetCursorItem) GetNewRecord

func (c TagSetCursorItem) GetNewRecord() (*record.Record, error)

type TierInfo

type TierInfo struct {
}

type TsspSequenceReader added in v1.0.0

type TsspSequenceReader struct {
	executor.BaseProcessor

	Output *executor.SeriesRecordPort
	// contains filtered or unexported fields
}

func (*TsspSequenceReader) Close added in v1.0.0

func (r *TsspSequenceReader) Close()

func (*TsspSequenceReader) Create added in v1.0.0

func (*TsspSequenceReader) Explain added in v1.0.0

func (r *TsspSequenceReader) Explain() []executor.ValuePair

func (*TsspSequenceReader) GetInputNumber added in v1.0.0

func (r *TsspSequenceReader) GetInputNumber(executor.Port) int

func (*TsspSequenceReader) GetInputs added in v1.0.0

func (r *TsspSequenceReader) GetInputs() executor.Ports

func (*TsspSequenceReader) GetOutputNumber added in v1.0.0

func (r *TsspSequenceReader) GetOutputNumber(executor.Port) int

func (*TsspSequenceReader) GetOutputs added in v1.0.0

func (r *TsspSequenceReader) GetOutputs() executor.Ports

func (*TsspSequenceReader) IsSink added in v1.0.0

func (r *TsspSequenceReader) IsSink() bool

func (*TsspSequenceReader) Name added in v1.0.0

func (r *TsspSequenceReader) Name() string

func (*TsspSequenceReader) Release added in v1.0.0

func (r *TsspSequenceReader) Release() error

func (*TsspSequenceReader) Work added in v1.0.0

func (r *TsspSequenceReader) Work(ctx context.Context) error

type WAL

type WAL struct {
	// contains filtered or unexported fields
}

func NewWAL

func NewWAL(path string, lockPath *string, shardID uint64, walSyncInterval time.Duration, walEnabled, replayParallel bool, partitionNum int, walReplayBatchSize int) *WAL

func (*WAL) Close

func (l *WAL) Close() error

func (*WAL) Replay

func (l *WAL) Replay(ctx context.Context, callBack ReplayCallFuncType) ([]string, error)

func (*WAL) Switch

func (l *WAL) Switch() (*WalFiles, error)

func (*WAL) SwitchReplay added in v1.5.0

func (l *WAL) SwitchReplay() *WalFiles

func (*WAL) Write

func (l *WAL) Write(rows []byte, typ WalRecordType, maxRowTime int64) error

type WalFiles added in v1.4.0

type WalFiles struct {
	// contains filtered or unexported fields
}

func (*WalFiles) Add added in v1.4.0

func (w *WalFiles) Add(files ...string)

type WalRecordType

type WalRecordType byte

type WriteIntoStorageTransform added in v1.0.0

type WriteIntoStorageTransform struct {
	executor.BaseProcessor

	Input  *executor.SeriesRecordPort
	Output *executor.DownSampleStatePort
	// contains filtered or unexported fields
}

func (*WriteIntoStorageTransform) Close added in v1.0.0

func (r *WriteIntoStorageTransform) Close()

func (*WriteIntoStorageTransform) Create added in v1.0.0

func (*WriteIntoStorageTransform) EndFile added in v1.0.0

func (r *WriteIntoStorageTransform) EndFile() error

func (*WriteIntoStorageTransform) Explain added in v1.0.0

func (*WriteIntoStorageTransform) GetClosed added in v1.0.0

func (r *WriteIntoStorageTransform) GetClosed() chan struct{}

func (*WriteIntoStorageTransform) GetInputNumber added in v1.0.0

func (r *WriteIntoStorageTransform) GetInputNumber(executor.Port) int

func (*WriteIntoStorageTransform) GetInputs added in v1.0.0

func (r *WriteIntoStorageTransform) GetInputs() executor.Ports

func (*WriteIntoStorageTransform) GetOutputNumber added in v1.0.0

func (r *WriteIntoStorageTransform) GetOutputNumber(executor.Port) int

func (*WriteIntoStorageTransform) GetOutputs added in v1.0.0

func (r *WriteIntoStorageTransform) GetOutputs() executor.Ports

func (*WriteIntoStorageTransform) GetRowCount added in v1.0.0

func (r *WriteIntoStorageTransform) GetRowCount() int

func (*WriteIntoStorageTransform) InitFile added in v1.1.0

func (r *WriteIntoStorageTransform) InitFile(sRecord *executor.SeriesRecord) error

func (*WriteIntoStorageTransform) Name added in v1.0.0

func (*WriteIntoStorageTransform) Release added in v1.0.0

func (r *WriteIntoStorageTransform) Release() error

func (*WriteIntoStorageTransform) SetTaskId added in v1.0.0

func (r *WriteIntoStorageTransform) SetTaskId(taskID int)

func (*WriteIntoStorageTransform) Work added in v1.0.0

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL