Documentation
¶
Overview ¶
Copyright 2022 Huawei Cloud Computing Technologies Co., Ltd.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- Variables
- func AddLocationsWithFirstTime(l *immutable.LocationCursor, files immutable.TableReaders, ...) (int64, error)
- func AddLocationsWithInit(l *immutable.LocationCursor, files immutable.TableReaders, ...) error
- func AddLocationsWithLimit(l *immutable.LocationCursor, files immutable.TableReaders, ...) (int64, error)
- func AppendColumnTimes(bitmap []bool, column executor.Column, columnTimes []int64, ...)
- func AppendNilRowWithTime(rec *record.Record, t int64)
- func AppendRecWithNilRows(rec, re *record.Record, opt hybridqp.Options, ...)
- func CanNotAggOnSeriesFunc(m map[string]*influxql.Call) bool
- func GetMaxTime(maxTime int64, rec *record.Record, isAscending bool) int64
- func GetMinTime(minTime int64, rec *record.Record, isAscending bool) int64
- func NewAggregateCursor(input comm.KeyCursor, schema *executor.QuerySchema, ...) *aggregateCursor
- func NewChunkReader(outputRowDataType hybridqp.RowDataType, ops []hybridqp.ExprOptions, ...) executor.Processor
- func NewEngine(dataPath, walPath string, options netstorage.EngineOptions, ctx *meta.LoadCtx) (netstorage.Engine, error)
- func NewFencer(dataPath, walPath, db string, pt uint32) fencer
- func NewFileLoopCursor(ctx *idKeyCursorContext, span *tracing.Span, schema *executor.QuerySchema, ...) *fileLoopCursor
- func NewFileSequenceAggregator(schema hybridqp.Catalog, addPrefix bool, shardStartTime, shardEndTime int64) executor.Processor
- func NewFloatColFloatHeapReducer(inOrdinal, outOrdinal int, auxProcessors []*auxProcessor, ...) *floatColFloatHeapReducer
- func NewIntegerColIntegerHeapReducer(inOrdinal, outOrdinal int, auxProcessors []*auxProcessor, ...) *integerColIntegerHeapReducer
- func NewLimitCursor(schema *executor.QuerySchema, ...) *limitCursor
- func NewRecordSchema(querySchema *executor.QuerySchema, auxTags []string, schema record.Schemas, ...) ([]string, record.Schemas)
- func NewSeriesInfoPool(num int64) *filesInfoPool
- func NewShard(dataPath, walPath string, lockPath *string, ident *meta.ShardIdentifier, ...) *shard
- func NewTagSetCursorForTest(schema *executor.QuerySchema, seriesN int) *tagSetCursor
- func NewTopNLinkedList(n int, ascending bool) *topNLinkedList
- func NewTsspSequenceReader(outputRowDataType hybridqp.RowDataType, ops []hybridqp.ExprOptions, ...) executor.Processor
- func NewWriteIntoStorageTransform(outputRowDataType hybridqp.RowDataType, ops []hybridqp.ExprOptions, ...) executor.Processor
- func RecordCutNormal(start, end int, src, dst *record.Record)
- func RegisterShardTask(taskContent *ShardTaskContent, f func(content *ShardTaskContent) error) int
- func RemoveTask(shardID uint64)
- func SetFullCompColdDuration(d time.Duration)
- func SetNextMethod(cursor comm.KeyCursor)
- func StartRegisterTask(id uint64) error
- type AggTagSetCursor
- func (s *AggTagSetCursor) Close() error
- func (s *AggTagSetCursor) EndSpan()
- func (s *AggTagSetCursor) GetIndex(t int64) int64
- func (s *AggTagSetCursor) GetSchema() record.Schemas
- func (s *AggTagSetCursor) Init()
- func (s *AggTagSetCursor) Name() string
- func (s *AggTagSetCursor) Next() (*record.Record, comm.SeriesInfoIntf, error)
- func (s *AggTagSetCursor) NextAggData() (*record.Record, *comm.FileInfo, error)
- func (s *AggTagSetCursor) NextWithMultipleSeries() (*record.Record, comm.SeriesInfoIntf, error)
- func (s *AggTagSetCursor) NextWithSingleSeries() (*record.Record, comm.SeriesInfoIntf, error)
- func (s *AggTagSetCursor) RecordInit() error
- func (s *AggTagSetCursor) SetOps(ops []*comm.CallOption)
- func (s *AggTagSetCursor) SetParaForTest(schema record.Schemas)
- func (s *AggTagSetCursor) SetSchema(schema record.Schemas)
- func (s *AggTagSetCursor) SinkPlan(plan hybridqp.QueryNode)
- func (s *AggTagSetCursor) StartSpan(span *tracing.Span)
- func (s *AggTagSetCursor) TagAuxHandler(re *record.Record, start, end int)
- func (s *AggTagSetCursor) TimeWindowsInit()
- func (s *AggTagSetCursor) UpdateRec(recRow, chunkRow int)
- type ChunkMetaByField
- type ChunkMetaByFieldIters
- type ChunkReader
- func (r *ChunkReader) Abort()
- func (r *ChunkReader) Close()
- func (r *ChunkReader) Create(plan executor.LogicalPlan, opt query.ProcessorOptions) (executor.Processor, error)
- func (r *ChunkReader) Explain() []executor.ValuePair
- func (r *ChunkReader) GetInputNumber(executor.Port) int
- func (r *ChunkReader) GetInputs() executor.Ports
- func (r *ChunkReader) GetOutputNumber(executor.Port) int
- func (r *ChunkReader) GetOutputs() executor.Ports
- func (r *ChunkReader) IsSink() bool
- func (r *ChunkReader) Name() string
- func (r *ChunkReader) Release() error
- func (r *ChunkReader) Work(ctx context.Context) error
- type CoProcessor
- type CoProcessorImpl
- type Compactor
- func (c *Compactor) RegisterShard(sh *shard)
- func (c *Compactor) SetAllOutOfOrderMergeSwitch(en bool)
- func (c *Compactor) SetAllShardsCompactionSwitch(en bool)
- func (c *Compactor) SetSnapshotColdDuration(d time.Duration)
- func (c *Compactor) ShardCompactionSwitch(shid uint64, en bool)
- func (c *Compactor) ShardOutOfOrderMergeSwitch(shid uint64, en bool)
- func (c *Compactor) UnregisterShard(shardId uint64)
- type DBPTInfo
- func (dbPT *DBPTInfo) LoadAllShards(rp string, shardIDs []uint64) error
- func (dbPT *DBPTInfo) NewShard(rp string, shardID uint64, timeRangeInfo *meta.ShardTimeRangeInfo, ...) (Shard, error)
- func (dbPT *DBPTInfo) OpenIndexes(opId uint64, rp string) error
- func (dbPT *DBPTInfo) OpenShards(opId uint64, rp string, durationInfos map[uint64]*meta.ShardDurationInfo, ...) error
- func (dbPT *DBPTInfo) SetOption(opt netstorage.EngineOptions)
- func (dbPT *DBPTInfo) Shard(id uint64) Shard
- type DataBlockInfo
- type DownSampleFilesInfo
- type EndPointPair
- type Engine
- func (e *Engine) Assign(opId uint64, db string, ptId uint32, ver uint64, ...) error
- func (e *Engine) ChangeShardTierToWarm(db string, ptId uint32, shardID uint64) error
- func (e *Engine) Close() error
- func (e *Engine) CreateDBPT(db string, pt uint32)
- func (e *Engine) CreateLogicalPlan(ctx context.Context, db string, ptId uint32, shardID uint64, ...) (hybridqp.QueryNode, error)
- func (e *Engine) CreateShard(db, rp string, ptId uint32, shardID uint64, ...) error
- func (e *Engine) DbPTRef(db string, ptId uint32) error
- func (e *Engine) DbPTUnref(db string, ptId uint32)
- func (e *Engine) DeleteDatabase(db string, ptId uint32) (err error)
- func (e *Engine) DeleteIndex(db string, ptId uint32, indexID uint64) error
- func (e *Engine) DeleteShard(db string, ptId uint32, shardID uint64) error
- func (e *Engine) DropMeasurement(db string, rp string, name string, shardIds []uint64) error
- func (e *Engine) DropRetentionPolicy(db string, rp string, ptId uint32) error
- func (e *Engine) DropSeries(database string, sources []influxql.Source, ptId []uint32, ...) (int, error)
- func (e *Engine) ExpiredIndexes() []*meta2.IndexIdentifier
- func (e *Engine) ExpiredShards() []*meta2.ShardIdentifier
- func (e *Engine) FetchShardsNeedChangeStore() (shardsToWarm, shardsToCold []*meta2.ShardIdentifier)
- func (e *Engine) ForceFlush()
- func (e *Engine) GetDownSamplePolicy(key string) *meta2.StoreDownSamplePolicy
- func (e *Engine) GetLockFile() string
- func (e *Engine) GetShard(db string, ptId uint32, shardID uint64) Shard
- func (e *Engine) GetShardDownSampleLevel(db string, ptId uint32, shardID uint64) int
- func (e *Engine) GetShardDownSamplePolicyInfos(meta interface{ ... }) ([]*meta2.ShardDownSamplePolicyInfo, error)
- func (e *Engine) GetShardSplitPoints(db string, ptId uint32, shardID uint64, idxes []int64) ([]string, error)
- func (e *Engine) LogicalPlanCost(db string, ptId uint32, sources influxql.Sources, opt query.ProcessorOptions) (hybridqp.LogicalPlanCost, error)
- func (e *Engine) Offload(db string, ptId uint32) error
- func (e *Engine) Open(durationInfos map[uint64]*meta2.ShardDurationInfo, m meta.MetaClient) error
- func (e *Engine) PreAssign(opId uint64, db string, ptId uint32, ...) error
- func (e *Engine) PreOffload(db string, ptId uint32) error
- func (e *Engine) RemoveDeadDownSamplePolicy()
- func (e *Engine) ResetDownSampleFlag()
- func (e *Engine) RollbackPreOffload(db string, ptId uint32) error
- func (e *Engine) SeriesCardinality(db string, ptIDs []uint32, namesWithVer [][]byte, condition influxql.Expr) ([]meta2.MeasurementCardinalityInfo, error)
- func (e *Engine) SeriesExactCardinality(db string, ptIDs []uint32, measurements [][]byte, condition influxql.Expr) (map[string]uint64, error)
- func (e *Engine) SeriesKeys(db string, ptIDs []uint32, measurements [][]byte, condition influxql.Expr) ([]string, error)
- func (e *Engine) SetReadOnly(readonly bool)
- func (e *Engine) StartDownSampleTask(sdsp *meta2.ShardDownSamplePolicyInfo, schema []hybridqp.Catalog, ...) error
- func (e *Engine) Statistics(buffer []byte) ([]byte, error)
- func (e *Engine) SysCtrl(req *netstorage.SysCtrlRequest) error
- func (e *Engine) TagValues(db string, ptIDs []uint32, tagKeys map[string][][]byte, ...) (netstorage.TablesTagSets, error)
- func (e *Engine) TagValuesCardinality(db string, ptIDs []uint32, tagKeys map[string][][]byte, ...) (map[string]uint64, error)
- func (e *Engine) UpdateDownSampleInfo(policies *meta2.DownSamplePoliciesInfoWithDbRp)
- func (e *Engine) UpdateShardDownSampleInfo(infos *meta2.ShardDownSampleUpdateInfos)
- func (e *Engine) UpdateShardDurationInfo(info *meta2.ShardDurationInfo) error
- func (e *Engine) UpdateStoreDownSamplePolicies(info *meta2.DownSamplePolicyInfo, ident string)
- func (e *Engine) WriteRows(db, rp string, ptId uint32, shardID uint64, rows []influx.Row, ...) error
- type FieldIter
- type FileSequenceAggregator
- func (r *FileSequenceAggregator) Aggregate()
- func (r *FileSequenceAggregator) AggregateSameSchema() error
- func (r *FileSequenceAggregator) Close()
- func (r *FileSequenceAggregator) Create(plan executor.LogicalPlan, opt query.ProcessorOptions) (executor.Processor, error)
- func (r *FileSequenceAggregator) Explain() []executor.ValuePair
- func (r *FileSequenceAggregator) GetInputNumber(executor.Port) int
- func (r *FileSequenceAggregator) GetInputs() executor.Ports
- func (r *FileSequenceAggregator) GetOutputNumber(executor.Port) int
- func (r *FileSequenceAggregator) GetOutputs() executor.Ports
- func (r *FileSequenceAggregator) GetProcessors()
- func (r *FileSequenceAggregator) IsSink() bool
- func (r *FileSequenceAggregator) Name() string
- func (r *FileSequenceAggregator) Release() error
- func (r *FileSequenceAggregator) SendRecord(re *record.Record)
- func (r *FileSequenceAggregator) Work(ctx context.Context) error
- type FloatHeapItem
- type FloatPointItem
- type IntegerHeapItem
- type IntegerPointItem
- type LogReplay
- type LogReplays
- type LogWriter
- type LogWriters
- type MetaIndexIterator
- type PreAggTagSetCursor
- func (s *PreAggTagSetCursor) Close() error
- func (s *PreAggTagSetCursor) EndSpan()
- func (s *PreAggTagSetCursor) GetSchema() record.Schemas
- func (s *PreAggTagSetCursor) Name() string
- func (s *PreAggTagSetCursor) Next() (*record.Record, comm.SeriesInfoIntf, error)
- func (s *PreAggTagSetCursor) NextAggData() (*record.Record, *comm.FileInfo, error)
- func (s *PreAggTagSetCursor) RecordInitPreAgg() error
- func (s *PreAggTagSetCursor) SetOps(ops []*comm.CallOption)
- func (s *PreAggTagSetCursor) SetSchema(schema record.Schemas)
- func (s *PreAggTagSetCursor) SinkPlan(plan hybridqp.QueryNode)
- func (s *PreAggTagSetCursor) StartSpan(span *tracing.Span)
- type PtNNLock
- type Reducer
- type ReducerEndpoint
- type ReducerParams
- type Routine
- type RoutineImpl
- type SeriesIter
- type Shard
- type ShardTaskContent
- type TagSetCursorItem
- type TierInfo
- type TsspSequenceReader
- func (r *TsspSequenceReader) Close()
- func (r *TsspSequenceReader) Create(plan executor.LogicalPlan, opt query.ProcessorOptions) (executor.Processor, error)
- func (r *TsspSequenceReader) Explain() []executor.ValuePair
- func (r *TsspSequenceReader) GetInputNumber(executor.Port) int
- func (r *TsspSequenceReader) GetInputs() executor.Ports
- func (r *TsspSequenceReader) GetOutputNumber(executor.Port) int
- func (r *TsspSequenceReader) GetOutputs() executor.Ports
- func (r *TsspSequenceReader) IsSink() bool
- func (r *TsspSequenceReader) Name() string
- func (r *TsspSequenceReader) Release() error
- func (r *TsspSequenceReader) Work(ctx context.Context) error
- type WAL
- type WalRecordType
- type WriteIntoStorageTransform
- func (r *WriteIntoStorageTransform) Close()
- func (r *WriteIntoStorageTransform) Create(plan executor.LogicalPlan, opt query.ProcessorOptions) (executor.Processor, error)
- func (r *WriteIntoStorageTransform) EndFile() error
- func (r *WriteIntoStorageTransform) Explain() []executor.ValuePair
- func (r *WriteIntoStorageTransform) GetClosed() chan struct{}
- func (r *WriteIntoStorageTransform) GetInputNumber(executor.Port) int
- func (r *WriteIntoStorageTransform) GetInputs() executor.Ports
- func (r *WriteIntoStorageTransform) GetOutputNumber(executor.Port) int
- func (r *WriteIntoStorageTransform) GetOutputs() executor.Ports
- func (r *WriteIntoStorageTransform) GetRowCount() int
- func (r *WriteIntoStorageTransform) Name() string
- func (r *WriteIntoStorageTransform) Release() error
- func (r *WriteIntoStorageTransform) SetTaskId(taskID int)
- func (r *WriteIntoStorageTransform) Work(ctx context.Context) error
- type Writer
Constants ¶
const ( SidSequenceReaderRecordNum = 6 SequenceAggRecordNum = 3 )
const ( MaxRetryUpdateOnShardNum = 4 CRCLen = 4 BufferSize = 1024 * 1024 )
const ( MaxRegisterNum = 64 RegisterSuccess = 1 RegisterFail = -1 )
const ( Failpoint = "failpoint" Readonly = "readonly" )
const ( TierLeveMem = 1 // in memory TierLeveLocalDisk = 2 TierLeveObjectStorage = 3 )
const ( DefaultFileSize = 10 * 1024 * 1024 WALFileSuffixes = "wal" WalRecordHeadSize = 1 + 4 WalCompBufSize = 256 * 1024 WalCompMaxBufSize = 2 * 1024 * 1024 )
Variables ¶
var ( RecordIteratorPool = &sync.Pool{} ChunkMetaBufferPool = bufferpool.NewByteBufferPool(defaultChunkMetaSize) )
var ( AggPool = record.NewRecordPool(record.AggPool) SeriesPool = record.NewRecordPool(record.SeriesPool) TsmMergePool = record.NewRecordPool(record.TsmMergePool) )
var ( TsspSequencePool = record.NewRecordPool(record.TsspSequencePool) SequenceAggPool = record.NewRecordPool(record.SequenceAggPool) )
var AppendManyNils map[int]func(colVal *record.ColVal, count int)
var (
DownSampleWriteDrop = true
)
var (
FileCursorPool = record.NewRecordPool(record.FileCursorPool)
)
var (
IntervalRecordPool = record.NewRecordPool(record.IntervalRecordPool)
)
Functions ¶
func AddLocationsWithFirstTime ¶
func AddLocationsWithFirstTime(l *immutable.LocationCursor, files immutable.TableReaders, ctx *idKeyCursorContext, sid uint64) (int64, error)
func AddLocationsWithInit ¶
func AddLocationsWithInit(l *immutable.LocationCursor, files immutable.TableReaders, ctx *idKeyCursorContext, sid uint64) error
func AddLocationsWithLimit ¶
func AddLocationsWithLimit(l *immutable.LocationCursor, files immutable.TableReaders, ctx *idKeyCursorContext, sid uint64) (int64, error)
func AppendColumnTimes ¶
func AppendNilRowWithTime ¶ added in v1.0.0
func AppendRecWithNilRows ¶ added in v1.0.0
func NewAggregateCursor ¶
func NewAggregateCursor(input comm.KeyCursor, schema *executor.QuerySchema, globalPool *record.RecordPool, hasAuxTags bool) *aggregateCursor
func NewChunkReader ¶
func NewChunkReader(outputRowDataType hybridqp.RowDataType, ops []hybridqp.ExprOptions, seriesPlan hybridqp.QueryNode, source influxql.Sources, schema *executor.QuerySchema, cursors []interface{}) executor.Processor
func NewEngine ¶
func NewEngine(dataPath, walPath string, options netstorage.EngineOptions, ctx *meta.LoadCtx) (netstorage.Engine, error)
func NewFileLoopCursor ¶
func NewFileLoopCursor(ctx *idKeyCursorContext, span *tracing.Span, schema *executor.QuerySchema, tagSet *tsi.TagSetInfo, start, step int, s *shard) *fileLoopCursor
func NewFileSequenceAggregator ¶ added in v1.0.0
func NewFloatColFloatHeapReducer ¶
func NewFloatColFloatHeapReducer(inOrdinal, outOrdinal int, auxProcessors []*auxProcessor, floatHeapItem *FloatHeapItem) *floatColFloatHeapReducer
func NewIntegerColIntegerHeapReducer ¶
func NewIntegerColIntegerHeapReducer(inOrdinal, outOrdinal int, auxProcessors []*auxProcessor, integerHeapItem *IntegerHeapItem) *integerColIntegerHeapReducer
func NewLimitCursor ¶
func NewLimitCursor(schema *executor.QuerySchema, helper func(start, end int, src, des *record.Record)) *limitCursor
func NewRecordSchema ¶
func NewSeriesInfoPool ¶
func NewSeriesInfoPool(num int64) *filesInfoPool
func NewShard ¶
func NewShard(dataPath, walPath string, lockPath *string, ident *meta.ShardIdentifier, durationInfo *meta.DurationDescriptor, tr *meta.TimeRangeInfo, options netstorage.EngineOptions) *shard
func NewTagSetCursorForTest ¶
func NewTagSetCursorForTest(schema *executor.QuerySchema, seriesN int) *tagSetCursor
NewTagSetCursorForTest for ut test, will remove later
func NewTopNLinkedList ¶
func NewTsspSequenceReader ¶ added in v1.0.0
func NewTsspSequenceReader(outputRowDataType hybridqp.RowDataType, ops []hybridqp.ExprOptions, seriesPlan hybridqp.QueryNode, source influxql.Sources, schema *executor.QuerySchema, files *immutable.TSSPFiles, newSeqs []uint64, stop chan struct{}) executor.Processor
func NewWriteIntoStorageTransform ¶ added in v1.0.0
func RecordCutNormal ¶
func RegisterShardTask ¶ added in v1.0.0
func RegisterShardTask(taskContent *ShardTaskContent, f func(content *ShardTaskContent) error) int
func RemoveTask ¶ added in v1.0.0
func RemoveTask(shardID uint64)
func SetFullCompColdDuration ¶
func SetNextMethod ¶ added in v1.0.0
SetNextMethod for test
func StartRegisterTask ¶ added in v1.0.0
Types ¶
type AggTagSetCursor ¶
type AggTagSetCursor struct {
// contains filtered or unexported fields
}
func NewAggTagSetCursor ¶
func NewAggTagSetCursor(schema *executor.QuerySchema, ctx *idKeyCursorContext, itr comm.KeyCursor, singleSeries bool) *AggTagSetCursor
func (*AggTagSetCursor) Close ¶
func (s *AggTagSetCursor) Close() error
func (*AggTagSetCursor) EndSpan ¶
func (s *AggTagSetCursor) EndSpan()
func (*AggTagSetCursor) GetIndex ¶ added in v1.0.0
func (s *AggTagSetCursor) GetIndex(t int64) int64
func (*AggTagSetCursor) GetSchema ¶
func (s *AggTagSetCursor) GetSchema() record.Schemas
func (*AggTagSetCursor) Init ¶
func (s *AggTagSetCursor) Init()
func (*AggTagSetCursor) Name ¶
func (s *AggTagSetCursor) Name() string
func (*AggTagSetCursor) Next ¶
func (s *AggTagSetCursor) Next() (*record.Record, comm.SeriesInfoIntf, error)
func (*AggTagSetCursor) NextAggData ¶
func (*AggTagSetCursor) NextWithMultipleSeries ¶ added in v1.0.0
func (s *AggTagSetCursor) NextWithMultipleSeries() (*record.Record, comm.SeriesInfoIntf, error)
func (*AggTagSetCursor) NextWithSingleSeries ¶ added in v1.0.0
func (s *AggTagSetCursor) NextWithSingleSeries() (*record.Record, comm.SeriesInfoIntf, error)
func (*AggTagSetCursor) RecordInit ¶
func (s *AggTagSetCursor) RecordInit() error
func (*AggTagSetCursor) SetOps ¶
func (s *AggTagSetCursor) SetOps(ops []*comm.CallOption)
func (*AggTagSetCursor) SetParaForTest ¶ added in v1.0.0
func (s *AggTagSetCursor) SetParaForTest(schema record.Schemas)
func (*AggTagSetCursor) SetSchema ¶
func (s *AggTagSetCursor) SetSchema(schema record.Schemas)
func (*AggTagSetCursor) SinkPlan ¶
func (s *AggTagSetCursor) SinkPlan(plan hybridqp.QueryNode)
func (*AggTagSetCursor) StartSpan ¶
func (s *AggTagSetCursor) StartSpan(span *tracing.Span)
func (*AggTagSetCursor) TagAuxHandler ¶
func (s *AggTagSetCursor) TagAuxHandler(re *record.Record, start, end int)
func (*AggTagSetCursor) TimeWindowsInit ¶
func (s *AggTagSetCursor) TimeWindowsInit()
func (*AggTagSetCursor) UpdateRec ¶ added in v1.0.0
func (s *AggTagSetCursor) UpdateRec(recRow, chunkRow int)
type ChunkMetaByField ¶ added in v1.0.0
type ChunkMetaByField struct {
// contains filtered or unexported fields
}
ChunkMetaByField build from each chunkmeta. ChunkMetaByField obtain data by sid column by column
func NewChunkMetaByField ¶ added in v1.0.0
func NewChunkMetaByField(file immutable.TSSPFile, fieldIter *FieldIter, chunkMeta immutable.ChunkMeta, recordPool *record.CircularRecordPool) *ChunkMetaByField
type ChunkMetaByFieldIters ¶ added in v1.0.0
type ChunkMetaByFieldIters struct {
// contains filtered or unexported fields
}
ChunkMetaByFieldIters is the iterator of ChunkMetaByField.
func NewChunkMetaByFieldIters ¶ added in v1.0.0
func NewChunkMetaByFieldIters(chunkMetas []immutable.ChunkMeta, file immutable.TSSPFile, fieldIter *FieldIter, recordPool *record.CircularRecordPool) *ChunkMetaByFieldIters
type ChunkReader ¶
type ChunkReader struct {
executor.BaseProcessor
Output *executor.ChunkPort
ResultChunkPool *executor.CircularChunkPool
// contains filtered or unexported fields
}
func (*ChunkReader) Abort ¶ added in v1.0.1
func (r *ChunkReader) Abort()
func (*ChunkReader) Close ¶
func (r *ChunkReader) Close()
func (*ChunkReader) Create ¶
func (r *ChunkReader) Create(plan executor.LogicalPlan, opt query.ProcessorOptions) (executor.Processor, error)
func (*ChunkReader) Explain ¶
func (r *ChunkReader) Explain() []executor.ValuePair
func (*ChunkReader) GetInputNumber ¶
func (r *ChunkReader) GetInputNumber(executor.Port) int
func (*ChunkReader) GetInputs ¶
func (r *ChunkReader) GetInputs() executor.Ports
func (*ChunkReader) GetOutputNumber ¶
func (r *ChunkReader) GetOutputNumber(executor.Port) int
func (*ChunkReader) GetOutputs ¶
func (r *ChunkReader) GetOutputs() executor.Ports
func (*ChunkReader) IsSink ¶
func (r *ChunkReader) IsSink() bool
func (*ChunkReader) Name ¶
func (r *ChunkReader) Name() string
func (*ChunkReader) Release ¶
func (r *ChunkReader) Release() error
type CoProcessor ¶
type CoProcessor interface {
WorkOnRecord(*record.Record, *record.Record, *ReducerParams)
}
type CoProcessorImpl ¶
type CoProcessorImpl struct {
Routines []Routine
}
func NewCoProcessorImpl ¶
func NewCoProcessorImpl(routines ...Routine) *CoProcessorImpl
func (*CoProcessorImpl) AppendRoutine ¶
func (p *CoProcessorImpl) AppendRoutine(routines ...Routine)
func (*CoProcessorImpl) WorkOnRecord ¶
func (p *CoProcessorImpl) WorkOnRecord(in *record.Record, out *record.Record, params *ReducerParams)
type Compactor ¶
type Compactor struct {
// contains filtered or unexported fields
}
func NewCompactor ¶
func NewCompactor() *Compactor
func (*Compactor) RegisterShard ¶
func (c *Compactor) RegisterShard(sh *shard)
func (*Compactor) SetAllOutOfOrderMergeSwitch ¶
func (*Compactor) SetAllShardsCompactionSwitch ¶
func (*Compactor) SetSnapshotColdDuration ¶
func (*Compactor) ShardCompactionSwitch ¶
func (*Compactor) ShardOutOfOrderMergeSwitch ¶
func (*Compactor) UnregisterShard ¶
type DBPTInfo ¶
type DBPTInfo struct {
// contains filtered or unexported fields
}
func NewDBPTInfo ¶
func (*DBPTInfo) LoadAllShards ¶
func (*DBPTInfo) NewShard ¶
func (dbPT *DBPTInfo) NewShard(rp string, shardID uint64, timeRangeInfo *meta.ShardTimeRangeInfo, client metaclient.MetaClient) (Shard, error)
func (*DBPTInfo) OpenShards ¶
func (dbPT *DBPTInfo) OpenShards(opId uint64, rp string, durationInfos map[uint64]*meta.ShardDurationInfo, loadStat int, client metaclient.MetaClient) error
func (*DBPTInfo) SetOption ¶
func (dbPT *DBPTInfo) SetOption(opt netstorage.EngineOptions)
type DataBlockInfo ¶
type DataBlockInfo struct {
// contains filtered or unexported fields
}
type DownSampleFilesInfo ¶ added in v1.0.0
type EndPointPair ¶
type Engine ¶
type Engine struct {
ReadOnly bool
DBPartitions map[string]map[uint32]*DBPTInfo
DownSamplePolicies map[string]*meta2.StoreDownSamplePolicy
// contains filtered or unexported fields
}
func (*Engine) Assign ¶ added in v1.0.0
func (e *Engine) Assign(opId uint64, db string, ptId uint32, ver uint64, durationInfos map[uint64]*meta2.ShardDurationInfo, client metaclient.MetaClient) error
func (*Engine) ChangeShardTierToWarm ¶
func (*Engine) CreateDBPT ¶
func (*Engine) CreateLogicalPlan ¶
func (*Engine) CreateShard ¶
func (*Engine) DeleteIndex ¶
func (*Engine) DeleteShard ¶
todo:need confirm
func (*Engine) DropMeasurement ¶
func (*Engine) DropRetentionPolicy ¶
func (*Engine) DropSeries ¶
func (*Engine) ExpiredIndexes ¶
func (e *Engine) ExpiredIndexes() []*meta2.IndexIdentifier
func (*Engine) ExpiredShards ¶
func (e *Engine) ExpiredShards() []*meta2.ShardIdentifier
func (*Engine) FetchShardsNeedChangeStore ¶
func (e *Engine) FetchShardsNeedChangeStore() (shardsToWarm, shardsToCold []*meta2.ShardIdentifier)
func (*Engine) ForceFlush ¶
func (e *Engine) ForceFlush()
func (*Engine) GetDownSamplePolicy ¶ added in v1.0.0
func (e *Engine) GetDownSamplePolicy(key string) *meta2.StoreDownSamplePolicy
func (*Engine) GetLockFile ¶
func (*Engine) GetShardDownSampleLevel ¶ added in v1.0.0
func (*Engine) GetShardDownSamplePolicyInfos ¶ added in v1.0.0
func (e *Engine) GetShardDownSamplePolicyInfos(meta interface { UpdateShardDownSampleInfo(Ident *meta2.ShardIdentifier) error }) ([]*meta2.ShardDownSamplePolicyInfo, error)
func (*Engine) GetShardSplitPoints ¶
func (*Engine) LogicalPlanCost ¶
func (e *Engine) LogicalPlanCost(db string, ptId uint32, sources influxql.Sources, opt query.ProcessorOptions) (hybridqp.LogicalPlanCost, error)
func (*Engine) Open ¶
func (e *Engine) Open(durationInfos map[uint64]*meta2.ShardDurationInfo, m meta.MetaClient) error
func (*Engine) PreAssign ¶ added in v1.0.0
func (e *Engine) PreAssign(opId uint64, db string, ptId uint32, durationInfos map[uint64]*meta2.ShardDurationInfo, client metaclient.MetaClient) error
func (*Engine) PreOffload ¶ added in v1.0.0
func (*Engine) RemoveDeadDownSamplePolicy ¶ added in v1.0.0
func (e *Engine) RemoveDeadDownSamplePolicy()
func (*Engine) ResetDownSampleFlag ¶ added in v1.0.0
func (e *Engine) ResetDownSampleFlag()
func (*Engine) RollbackPreOffload ¶ added in v1.0.0
func (*Engine) SeriesCardinality ¶
func (*Engine) SeriesExactCardinality ¶
func (*Engine) SeriesKeys ¶
func (*Engine) SetReadOnly ¶
func (*Engine) StartDownSampleTask ¶ added in v1.0.0
func (*Engine) SysCtrl ¶
func (e *Engine) SysCtrl(req *netstorage.SysCtrlRequest) error
func (*Engine) TagValues ¶
func (e *Engine) TagValues(db string, ptIDs []uint32, tagKeys map[string][][]byte, condition influxql.Expr) (netstorage.TablesTagSets, error)
func (*Engine) TagValuesCardinality ¶
func (*Engine) UpdateDownSampleInfo ¶ added in v1.0.0
func (e *Engine) UpdateDownSampleInfo(policies *meta2.DownSamplePoliciesInfoWithDbRp)
func (*Engine) UpdateShardDownSampleInfo ¶ added in v1.0.0
func (e *Engine) UpdateShardDownSampleInfo(infos *meta2.ShardDownSampleUpdateInfos)
func (*Engine) UpdateShardDurationInfo ¶
func (e *Engine) UpdateShardDurationInfo(info *meta2.ShardDurationInfo) error
func (*Engine) UpdateStoreDownSamplePolicies ¶ added in v1.0.0
func (e *Engine) UpdateStoreDownSamplePolicies(info *meta2.DownSamplePolicyInfo, ident string)
type FieldIter ¶ added in v1.0.0
type FieldIter struct {
// contains filtered or unexported fields
}
func NewFieldIter ¶ added in v1.0.0
func NewFieldIter(querySchema *executor.QuerySchema) *FieldIter
func (*FieldIter) GetRecordSchemas ¶ added in v1.0.0
type FileSequenceAggregator ¶ added in v1.0.0
type FileSequenceAggregator struct {
executor.BaseProcessor
Input *executor.SeriesRecordPort
Output *executor.SeriesRecordPort
// contains filtered or unexported fields
}
func (*FileSequenceAggregator) Aggregate ¶ added in v1.0.0
func (r *FileSequenceAggregator) Aggregate()
func (*FileSequenceAggregator) AggregateSameSchema ¶ added in v1.0.0
func (r *FileSequenceAggregator) AggregateSameSchema() error
func (*FileSequenceAggregator) Close ¶ added in v1.0.0
func (r *FileSequenceAggregator) Close()
func (*FileSequenceAggregator) Create ¶ added in v1.0.0
func (r *FileSequenceAggregator) Create(plan executor.LogicalPlan, opt query.ProcessorOptions) (executor.Processor, error)
func (*FileSequenceAggregator) Explain ¶ added in v1.0.0
func (r *FileSequenceAggregator) Explain() []executor.ValuePair
func (*FileSequenceAggregator) GetInputNumber ¶ added in v1.0.0
func (r *FileSequenceAggregator) GetInputNumber(executor.Port) int
func (*FileSequenceAggregator) GetInputs ¶ added in v1.0.0
func (r *FileSequenceAggregator) GetInputs() executor.Ports
func (*FileSequenceAggregator) GetOutputNumber ¶ added in v1.0.0
func (r *FileSequenceAggregator) GetOutputNumber(executor.Port) int
func (*FileSequenceAggregator) GetOutputs ¶ added in v1.0.0
func (r *FileSequenceAggregator) GetOutputs() executor.Ports
func (*FileSequenceAggregator) GetProcessors ¶ added in v1.0.0
func (r *FileSequenceAggregator) GetProcessors()
func (*FileSequenceAggregator) IsSink ¶ added in v1.0.0
func (r *FileSequenceAggregator) IsSink() bool
func (*FileSequenceAggregator) Name ¶ added in v1.0.0
func (r *FileSequenceAggregator) Name() string
func (*FileSequenceAggregator) Release ¶ added in v1.0.0
func (r *FileSequenceAggregator) Release() error
func (*FileSequenceAggregator) SendRecord ¶ added in v1.0.0
func (r *FileSequenceAggregator) SendRecord(re *record.Record)
type FloatHeapItem ¶
type FloatHeapItem struct {
// contains filtered or unexported fields
}
func NewFloatHeapItem ¶
func NewFloatHeapItem(n int, cmpByValue, cmpByTime func(a, b *FloatPointItem) bool) *FloatHeapItem
func (*FloatHeapItem) Len ¶
func (f *FloatHeapItem) Len() int
func (*FloatHeapItem) Less ¶
func (f *FloatHeapItem) Less(i, j int) bool
func (*FloatHeapItem) Pop ¶
func (f *FloatHeapItem) Pop() interface{}
func (*FloatHeapItem) Push ¶
func (f *FloatHeapItem) Push(x interface{})
func (*FloatHeapItem) Reset ¶
func (f *FloatHeapItem) Reset()
func (*FloatHeapItem) Swap ¶
func (f *FloatHeapItem) Swap(i, j int)
type FloatPointItem ¶
type FloatPointItem struct {
// contains filtered or unexported fields
}
func NewFloatPointItem ¶
func NewFloatPointItem(time int64, value float64) *FloatPointItem
type IntegerHeapItem ¶
type IntegerHeapItem struct {
// contains filtered or unexported fields
}
func NewIntegerHeapItem ¶
func NewIntegerHeapItem(n int, cmpByValue, cmpByTime func(a, b *IntegerPointItem) bool) *IntegerHeapItem
func (*IntegerHeapItem) Len ¶
func (f *IntegerHeapItem) Len() int
func (*IntegerHeapItem) Less ¶
func (f *IntegerHeapItem) Less(i, j int) bool
func (*IntegerHeapItem) Pop ¶
func (f *IntegerHeapItem) Pop() interface{}
func (*IntegerHeapItem) Push ¶
func (f *IntegerHeapItem) Push(x interface{})
func (*IntegerHeapItem) Reset ¶
func (f *IntegerHeapItem) Reset()
func (*IntegerHeapItem) Swap ¶
func (f *IntegerHeapItem) Swap(i, j int)
type IntegerPointItem ¶
type IntegerPointItem struct {
// contains filtered or unexported fields
}
func NewIntegerPointItem ¶
func NewIntegerPointItem(time int64, value int64) *IntegerPointItem
type LogReplay ¶ added in v1.0.1
type LogReplay struct {
// contains filtered or unexported fields
}
type LogReplays ¶ added in v1.0.1
type LogReplays []LogReplay
type LogWriters ¶ added in v1.0.1
type LogWriters []LogWriter
type MetaIndexIterator ¶ added in v1.0.0
type MetaIndexIterator struct {
// contains filtered or unexported fields
}
func NewMetaIndexIterators ¶ added in v1.0.0
func NewMetaIndexIterators(file immutable.TSSPFile, querySchema *executor.QuerySchema) (*MetaIndexIterator, error)
type PreAggTagSetCursor ¶
type PreAggTagSetCursor struct {
// contains filtered or unexported fields
}
func NewPreAggTagSetCursor ¶
func NewPreAggTagSetCursor(schema *executor.QuerySchema, ctx *idKeyCursorContext, itr comm.KeyCursor) *PreAggTagSetCursor
func (*PreAggTagSetCursor) Close ¶
func (s *PreAggTagSetCursor) Close() error
func (*PreAggTagSetCursor) EndSpan ¶
func (s *PreAggTagSetCursor) EndSpan()
func (*PreAggTagSetCursor) GetSchema ¶
func (s *PreAggTagSetCursor) GetSchema() record.Schemas
func (*PreAggTagSetCursor) Name ¶
func (s *PreAggTagSetCursor) Name() string
func (*PreAggTagSetCursor) Next ¶
func (s *PreAggTagSetCursor) Next() (*record.Record, comm.SeriesInfoIntf, error)
func (*PreAggTagSetCursor) NextAggData ¶
func (*PreAggTagSetCursor) RecordInitPreAgg ¶
func (s *PreAggTagSetCursor) RecordInitPreAgg() error
func (*PreAggTagSetCursor) SetOps ¶
func (s *PreAggTagSetCursor) SetOps(ops []*comm.CallOption)
func (*PreAggTagSetCursor) SetSchema ¶
func (s *PreAggTagSetCursor) SetSchema(schema record.Schemas)
func (*PreAggTagSetCursor) SinkPlan ¶
func (s *PreAggTagSetCursor) SinkPlan(plan hybridqp.QueryNode)
func (*PreAggTagSetCursor) StartSpan ¶
func (s *PreAggTagSetCursor) StartSpan(span *tracing.Span)
type Reducer ¶
type Reducer interface {
Aggregate(*ReducerEndpoint, *ReducerParams)
}
type ReducerEndpoint ¶
type ReducerEndpoint struct {
InputPoint EndPointPair
OutputPoint EndPointPair
}
type ReducerParams ¶
type ReducerParams struct {
// contains filtered or unexported fields
}
type Routine ¶
type Routine interface {
WorkOnRecord(*record.Record, *record.Record, *ReducerParams)
}
type RoutineImpl ¶
type RoutineImpl struct {
// contains filtered or unexported fields
}
func NewRoutineImpl ¶
func NewRoutineImpl(reducer Reducer, inOrdinal int, outOrdinal int) *RoutineImpl
func (*RoutineImpl) WorkOnRecord ¶
func (r *RoutineImpl) WorkOnRecord(in *record.Record, out *record.Record, params *ReducerParams)
type SeriesIter ¶
type SeriesIter struct {
// contains filtered or unexported fields
}
type Shard ¶
type Shard interface {
WriteRows(rows []influx.Row, binaryRows []byte) error
ForceFlush()
MaxTime() int64
Count() uint64
SeriesCount() int
Close() error
Ref()
UnRef()
CreateLogicalPlan(ctx context.Context, sources influxql.Sources, schema *executor.QuerySchema) (hybridqp.QueryNode, error)
LogicalPlanCost(sources influxql.Sources, opt query.ProcessorOptions) (hybridqp.LogicalPlanCost, error)
GetSplitPoints(idxes []int64) ([]string, error)
Expired() bool
TierDurationExpired() (tier uint64, expired bool)
RPName() string
StartDownSample(taskID uint64, level int, sdsp *meta.ShardDownSamplePolicyInfo, meta interface {
UpdateShardDownSampleInfo(Ident *meta.ShardIdentifier) error
}) error
GetTSSPFiles(mm string, isOrder bool) (*immutable.TSSPFiles, bool)
GetIndexBuild() *tsi.IndexBuilder
GetID() uint64
Open(client metaclient.MetaClient) error
DataPath() string
WalPath() string
TableStore() immutable.TablesStore
Ident() *meta.ShardIdentifier
Duration() *meta.DurationDescriptor
ChangeShardTierToWarm()
SetWriteColdDuration(duration time.Duration)
SetMutableSizeLimit(size int64)
IsOutOfOrderFilesExist() bool
DropMeasurement(ctx context.Context, name string) error
Statistics(buffer []byte) ([]byte, error)
NewShardKeyIdx(shardType, dataPath string, lockPath *string) error
GetShardDownSamplePolicy(policy *meta.DownSamplePolicyInfo) *meta.ShardDownSamplePolicyInfo
SetShardDownSampleLevel(i int)
UpdateDownSampleOnShard(id uint64, level int)
NewDownSampleTask(sdsp *meta.ShardDownSamplePolicyInfo, schema []hybridqp.Catalog, log *zap.Logger)
UpdateShardReadOnly(meta interface {
UpdateShardDownSampleInfo(Ident *meta.ShardIdentifier) error
}) error
DisableDownSample()
EnableDownSample()
CanDoDownSample() bool
CompactionEnabled() bool
DisableCompAndMerge()
EnableCompAndMerge()
Compact() error
WaitWriteFinish()
SetIndexBuilder(builder *tsi.IndexBuilder)
CloseIndexBuilder() error
Scan(span *tracing.Span, schema *executor.QuerySchema, callBack func(num int64) error) (tsi.GroupSeries, int64, error)
CreateCursor(ctx context.Context, schema *executor.QuerySchema) ([]comm.KeyCursor, error)
// contains filtered or unexported methods
}
type ShardTaskContent ¶ added in v1.0.0
type ShardTaskContent struct {
// contains filtered or unexported fields
}
type TagSetCursorItem ¶
type TagSetCursorItem struct {
// contains filtered or unexported fields
}
func (TagSetCursorItem) GetNewRecord ¶
func (c TagSetCursorItem) GetNewRecord() (*record.Record, error)
type TsspSequenceReader ¶ added in v1.0.0
type TsspSequenceReader struct {
executor.BaseProcessor
Output *executor.SeriesRecordPort
// contains filtered or unexported fields
}
func (*TsspSequenceReader) Close ¶ added in v1.0.0
func (r *TsspSequenceReader) Close()
func (*TsspSequenceReader) Create ¶ added in v1.0.0
func (r *TsspSequenceReader) Create(plan executor.LogicalPlan, opt query.ProcessorOptions) (executor.Processor, error)
func (*TsspSequenceReader) Explain ¶ added in v1.0.0
func (r *TsspSequenceReader) Explain() []executor.ValuePair
func (*TsspSequenceReader) GetInputNumber ¶ added in v1.0.0
func (r *TsspSequenceReader) GetInputNumber(executor.Port) int
func (*TsspSequenceReader) GetInputs ¶ added in v1.0.0
func (r *TsspSequenceReader) GetInputs() executor.Ports
func (*TsspSequenceReader) GetOutputNumber ¶ added in v1.0.0
func (r *TsspSequenceReader) GetOutputNumber(executor.Port) int
func (*TsspSequenceReader) GetOutputs ¶ added in v1.0.0
func (r *TsspSequenceReader) GetOutputs() executor.Ports
func (*TsspSequenceReader) IsSink ¶ added in v1.0.0
func (r *TsspSequenceReader) IsSink() bool
func (*TsspSequenceReader) Name ¶ added in v1.0.0
func (r *TsspSequenceReader) Name() string
func (*TsspSequenceReader) Release ¶ added in v1.0.0
func (r *TsspSequenceReader) Release() error
type WAL ¶
type WAL struct {
// contains filtered or unexported fields
}
type WriteIntoStorageTransform ¶ added in v1.0.0
type WriteIntoStorageTransform struct {
executor.BaseProcessor
Input *executor.SeriesRecordPort
Output *executor.DownSampleStatePort
// contains filtered or unexported fields
}
func (*WriteIntoStorageTransform) Close ¶ added in v1.0.0
func (r *WriteIntoStorageTransform) Close()
func (*WriteIntoStorageTransform) Create ¶ added in v1.0.0
func (r *WriteIntoStorageTransform) Create(plan executor.LogicalPlan, opt query.ProcessorOptions) (executor.Processor, error)
func (*WriteIntoStorageTransform) EndFile ¶ added in v1.0.0
func (r *WriteIntoStorageTransform) EndFile() error
func (*WriteIntoStorageTransform) Explain ¶ added in v1.0.0
func (r *WriteIntoStorageTransform) Explain() []executor.ValuePair
func (*WriteIntoStorageTransform) GetClosed ¶ added in v1.0.0
func (r *WriteIntoStorageTransform) GetClosed() chan struct{}
func (*WriteIntoStorageTransform) GetInputNumber ¶ added in v1.0.0
func (r *WriteIntoStorageTransform) GetInputNumber(executor.Port) int
func (*WriteIntoStorageTransform) GetInputs ¶ added in v1.0.0
func (r *WriteIntoStorageTransform) GetInputs() executor.Ports
func (*WriteIntoStorageTransform) GetOutputNumber ¶ added in v1.0.0
func (r *WriteIntoStorageTransform) GetOutputNumber(executor.Port) int
func (*WriteIntoStorageTransform) GetOutputs ¶ added in v1.0.0
func (r *WriteIntoStorageTransform) GetOutputs() executor.Ports
func (*WriteIntoStorageTransform) GetRowCount ¶ added in v1.0.0
func (r *WriteIntoStorageTransform) GetRowCount() int
func (*WriteIntoStorageTransform) Name ¶ added in v1.0.0
func (r *WriteIntoStorageTransform) Name() string
func (*WriteIntoStorageTransform) Release ¶ added in v1.0.0
func (r *WriteIntoStorageTransform) Release() error
func (*WriteIntoStorageTransform) SetTaskId ¶ added in v1.0.0
func (r *WriteIntoStorageTransform) SetTaskId(taskID int)
Source Files
¶
- agg_tagset_cursor.go
- aggregate_cursor.go
- compact.go
- coprocessor.go
- downsample_info.go
- engine.go
- engine_ddl.go
- engine_downsample.go
- engine_ha.go
- fence.go
- file_cursor.go
- group_cursor.go
- iterator_plan.go
- iterators.go
- iterators_helper.go
- limit_cursor.go
- log_writer.go
- partition.go
- reader.go
- record_plan.go
- series_agg_func.gen.go
- series_agg_reducer.gen.go
- series_call_processor.go
- series_cursor.go
- shard.go
- shard_task_manager.go
- sysctrl.go
- tagset_cursor.go
- tier.go
- topn_linkedlist.go
- tsm_merge_cursor.go
- wal.go
- writer.go