engine

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2023 License: Apache-2.0 Imports: 60 Imported by: 0

Documentation

Overview

Copyright 2022 Huawei Cloud Computing Technologies Co., Ltd.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	SidSequenceReaderRecordNum = 6
	SequenceAggRecordNum       = 3
)
View Source
const (
	MaxRetryUpdateOnShardNum = 4

	CRCLen     = 4
	BufferSize = 1024 * 1024
)
View Source
const (
	MaxRegisterNum  = 64
	RegisterSuccess = 1
	RegisterFail    = -1
)
View Source
const (
	Failpoint = "failpoint"
	Readonly  = "readonly"
)
View Source
const (
	TierLeveMem           = 1 // in memory
	TierLeveLocalDisk     = 2
	TierLeveObjectStorage = 3
)
View Source
const (
	DefaultFileSize   = 10 * 1024 * 1024
	WALFileSuffixes   = "wal"
	WalRecordHeadSize = 1 + 4
	WalCompBufSize    = 256 * 1024
	WalCompMaxBufSize = 2 * 1024 * 1024
)

Variables

View Source
var (
	RecordIteratorPool  = &sync.Pool{}
	ChunkMetaBufferPool = bufferpool.NewByteBufferPool(defaultChunkMetaSize)
)
View Source
var AppendManyNils map[int]func(colVal *record.ColVal, count int)
View Source
var (
	DownSampleWriteDrop = true
)
View Source
var (
	IntervalRecordPool = record.NewRecordPool(record.IntervalRecordPool)
)

Functions

func AddLocationsWithFirstTime

func AddLocationsWithFirstTime(l *immutable.LocationCursor, files immutable.TableReaders, ctx *idKeyCursorContext, sid uint64) (int64, error)

func AddLocationsWithInit

func AddLocationsWithInit(l *immutable.LocationCursor, files immutable.TableReaders, ctx *idKeyCursorContext, sid uint64) error

func AddLocationsWithLimit

func AddLocationsWithLimit(l *immutable.LocationCursor, files immutable.TableReaders, ctx *idKeyCursorContext, sid uint64) (int64, error)

func AppendColumnTimes

func AppendColumnTimes(bitmap []bool, column executor.Column, columnTimes []int64, recCol *record.ColVal)

func AppendNilRowWithTime added in v1.0.0

func AppendNilRowWithTime(rec *record.Record, t int64)

func AppendRecWithNilRows added in v1.0.0

func AppendRecWithNilRows(rec, re *record.Record, opt hybridqp.Options, seriesStart, seriesEnd, shardStart int64, last bool)

func CanNotAggOnSeriesFunc

func CanNotAggOnSeriesFunc(m map[string]*influxql.Call) bool

func GetMaxTime

func GetMaxTime(maxTime int64, rec *record.Record, isAscending bool) int64

func GetMinTime

func GetMinTime(minTime int64, rec *record.Record, isAscending bool) int64

func NewAggregateCursor

func NewAggregateCursor(input comm.KeyCursor, schema *executor.QuerySchema, globalPool *record.RecordPool, hasAuxTags bool) *aggregateCursor

func NewChunkReader

func NewChunkReader(outputRowDataType hybridqp.RowDataType, ops []hybridqp.ExprOptions, seriesPlan hybridqp.QueryNode,
	source influxql.Sources, schema *executor.QuerySchema, cursors []interface{}) executor.Processor

func NewEngine

func NewEngine(dataPath, walPath string, options netstorage.EngineOptions, ctx *meta.LoadCtx) (netstorage.Engine, error)

func NewFencer added in v1.0.0

func NewFencer(dataPath, walPath, db string, pt uint32) fencer

func NewFileLoopCursor

func NewFileLoopCursor(ctx *idKeyCursorContext, span *tracing.Span, schema *executor.QuerySchema,
	tagSet *tsi.TagSetInfo, start, step int, s *shard) *fileLoopCursor

func NewFileSequenceAggregator added in v1.0.0

func NewFileSequenceAggregator(schema hybridqp.Catalog, addPrefix bool, shardStartTime, shardEndTime int64) executor.Processor

func NewFloatColFloatHeapReducer

func NewFloatColFloatHeapReducer(inOrdinal, outOrdinal int, auxProcessors []*auxProcessor, floatHeapItem *FloatHeapItem) *floatColFloatHeapReducer

func NewIntegerColIntegerHeapReducer

func NewIntegerColIntegerHeapReducer(inOrdinal, outOrdinal int, auxProcessors []*auxProcessor, integerHeapItem *IntegerHeapItem) *integerColIntegerHeapReducer

func NewLimitCursor

func NewLimitCursor(schema *executor.QuerySchema, helper func(start, end int, src, des *record.Record)) *limitCursor

func NewRecordSchema

func NewRecordSchema(querySchema *executor.QuerySchema, auxTags []string, schema record.Schemas, filterConditions []*influxql.VarRef) ([]string, record.Schemas)

func NewSeriesInfoPool

func NewSeriesInfoPool(num int64) *filesInfoPool

func NewShard

func NewShard(dataPath, walPath string, lockPath *string, ident *meta.ShardIdentifier, durationInfo *meta.DurationDescriptor, tr *meta.TimeRangeInfo,
	options netstorage.EngineOptions) *shard

func NewTagSetCursorForTest

func NewTagSetCursorForTest(schema *executor.QuerySchema, seriesN int) *tagSetCursor

NewTagSetCursorForTest for ut test, will remove later

func NewTopNLinkedList

func NewTopNLinkedList(n int, ascending bool) *topNLinkedList

func NewTsspSequenceReader added in v1.0.0

func NewTsspSequenceReader(outputRowDataType hybridqp.RowDataType, ops []hybridqp.ExprOptions, seriesPlan hybridqp.QueryNode,
	source influxql.Sources, schema *executor.QuerySchema, files *immutable.TSSPFiles, newSeqs []uint64, stop chan struct{}) executor.Processor

func NewWriteIntoStorageTransform added in v1.0.0

func NewWriteIntoStorageTransform(outputRowDataType hybridqp.RowDataType, ops []hybridqp.ExprOptions, seriesPlan hybridqp.QueryNode,
	source influxql.Sources, schema *executor.QuerySchema, conf *immutable.Config, m *immutable.MmsTables, addPrefix bool) executor.Processor

func RecordCutNormal

func RecordCutNormal(start, end int, src, dst *record.Record)

func RegisterShardTask added in v1.0.0

func RegisterShardTask(taskContent *ShardTaskContent, f func(content *ShardTaskContent) error) int

func RemoveTask added in v1.0.0

func RemoveTask(shardID uint64)

func SetFullCompColdDuration

func SetFullCompColdDuration(d time.Duration)

func SetNextMethod added in v1.0.0

func SetNextMethod(cursor comm.KeyCursor)

SetNextMethod for test

func StartRegisterTask added in v1.0.0

func StartRegisterTask(id uint64) error

Types

type AggTagSetCursor

type AggTagSetCursor struct {
	// contains filtered or unexported fields
}

func NewAggTagSetCursor

func NewAggTagSetCursor(schema *executor.QuerySchema, ctx *idKeyCursorContext, itr comm.KeyCursor, singleSeries bool) *AggTagSetCursor

func (*AggTagSetCursor) Close

func (s *AggTagSetCursor) Close() error

func (*AggTagSetCursor) EndSpan

func (s *AggTagSetCursor) EndSpan()

func (*AggTagSetCursor) GetIndex added in v1.0.0

func (s *AggTagSetCursor) GetIndex(t int64) int64

func (*AggTagSetCursor) GetSchema

func (s *AggTagSetCursor) GetSchema() record.Schemas

func (*AggTagSetCursor) Init

func (s *AggTagSetCursor) Init()

func (*AggTagSetCursor) Name

func (s *AggTagSetCursor) Name() string

func (*AggTagSetCursor) Next

func (*AggTagSetCursor) NextAggData

func (s *AggTagSetCursor) NextAggData() (*record.Record, *comm.FileInfo, error)

func (*AggTagSetCursor) NextWithMultipleSeries added in v1.0.0

func (s *AggTagSetCursor) NextWithMultipleSeries() (*record.Record, comm.SeriesInfoIntf, error)

func (*AggTagSetCursor) NextWithSingleSeries added in v1.0.0

func (s *AggTagSetCursor) NextWithSingleSeries() (*record.Record, comm.SeriesInfoIntf, error)

func (*AggTagSetCursor) RecordInit

func (s *AggTagSetCursor) RecordInit() error

func (*AggTagSetCursor) SetOps

func (s *AggTagSetCursor) SetOps(ops []*comm.CallOption)

func (*AggTagSetCursor) SetParaForTest added in v1.0.0

func (s *AggTagSetCursor) SetParaForTest(schema record.Schemas)

func (*AggTagSetCursor) SetSchema

func (s *AggTagSetCursor) SetSchema(schema record.Schemas)

func (*AggTagSetCursor) SinkPlan

func (s *AggTagSetCursor) SinkPlan(plan hybridqp.QueryNode)

func (*AggTagSetCursor) StartSpan

func (s *AggTagSetCursor) StartSpan(span *tracing.Span)

func (*AggTagSetCursor) TagAuxHandler

func (s *AggTagSetCursor) TagAuxHandler(re *record.Record, start, end int)

func (*AggTagSetCursor) TimeWindowsInit

func (s *AggTagSetCursor) TimeWindowsInit()

func (*AggTagSetCursor) UpdateRec added in v1.0.0

func (s *AggTagSetCursor) UpdateRec(recRow, chunkRow int)

type ChunkMetaByField added in v1.0.0

type ChunkMetaByField struct {
	// contains filtered or unexported fields
}

ChunkMetaByField build from each chunkmeta. ChunkMetaByField obtain data by sid column by column

func NewChunkMetaByField added in v1.0.0

func NewChunkMetaByField(file immutable.TSSPFile, fieldIter *FieldIter, chunkMeta immutable.ChunkMeta, recordPool *record.CircularRecordPool) *ChunkMetaByField

type ChunkMetaByFieldIters added in v1.0.0

type ChunkMetaByFieldIters struct {
	// contains filtered or unexported fields
}

ChunkMetaByFieldIters is the iterator of ChunkMetaByField.

func NewChunkMetaByFieldIters added in v1.0.0

func NewChunkMetaByFieldIters(chunkMetas []immutable.ChunkMeta, file immutable.TSSPFile, fieldIter *FieldIter, recordPool *record.CircularRecordPool) *ChunkMetaByFieldIters

type ChunkReader

type ChunkReader struct {
	executor.BaseProcessor

	Output *executor.ChunkPort

	ResultChunkPool *executor.CircularChunkPool
	// contains filtered or unexported fields
}

func (*ChunkReader) Abort added in v1.0.1

func (r *ChunkReader) Abort()

func (*ChunkReader) Close

func (r *ChunkReader) Close()

func (*ChunkReader) Create

func (*ChunkReader) Explain

func (r *ChunkReader) Explain() []executor.ValuePair

func (*ChunkReader) GetInputNumber

func (r *ChunkReader) GetInputNumber(executor.Port) int

func (*ChunkReader) GetInputs

func (r *ChunkReader) GetInputs() executor.Ports

func (*ChunkReader) GetOutputNumber

func (r *ChunkReader) GetOutputNumber(executor.Port) int

func (*ChunkReader) GetOutputs

func (r *ChunkReader) GetOutputs() executor.Ports

func (*ChunkReader) IsSink

func (r *ChunkReader) IsSink() bool

func (*ChunkReader) Name

func (r *ChunkReader) Name() string

func (*ChunkReader) Release

func (r *ChunkReader) Release() error

func (*ChunkReader) Work

func (r *ChunkReader) Work(ctx context.Context) error

type CoProcessor

type CoProcessor interface {
	WorkOnRecord(*record.Record, *record.Record, *ReducerParams)
}

type CoProcessorImpl

type CoProcessorImpl struct {
	Routines []Routine
}

func NewCoProcessorImpl

func NewCoProcessorImpl(routines ...Routine) *CoProcessorImpl

func (*CoProcessorImpl) AppendRoutine

func (p *CoProcessorImpl) AppendRoutine(routines ...Routine)

func (*CoProcessorImpl) WorkOnRecord

func (p *CoProcessorImpl) WorkOnRecord(in *record.Record, out *record.Record, params *ReducerParams)

type Compactor

type Compactor struct {
	// contains filtered or unexported fields
}

func NewCompactor

func NewCompactor() *Compactor

func (*Compactor) RegisterShard

func (c *Compactor) RegisterShard(sh *shard)

func (*Compactor) SetAllOutOfOrderMergeSwitch

func (c *Compactor) SetAllOutOfOrderMergeSwitch(en bool)

func (*Compactor) SetAllShardsCompactionSwitch

func (c *Compactor) SetAllShardsCompactionSwitch(en bool)

func (*Compactor) SetSnapshotColdDuration

func (c *Compactor) SetSnapshotColdDuration(d time.Duration)

func (*Compactor) ShardCompactionSwitch

func (c *Compactor) ShardCompactionSwitch(shid uint64, en bool)

func (*Compactor) ShardOutOfOrderMergeSwitch

func (c *Compactor) ShardOutOfOrderMergeSwitch(shid uint64, en bool)

func (*Compactor) UnregisterShard

func (c *Compactor) UnregisterShard(shardId uint64)

type DBPTInfo

type DBPTInfo struct {
	// contains filtered or unexported fields
}

func NewDBPTInfo

func NewDBPTInfo(db string, id uint32, dataPath, walPath string, ctx *metaclient.LoadCtx) *DBPTInfo

func (*DBPTInfo) LoadAllShards

func (dbPT *DBPTInfo) LoadAllShards(rp string, shardIDs []uint64) error

func (*DBPTInfo) NewShard

func (dbPT *DBPTInfo) NewShard(rp string, shardID uint64, timeRangeInfo *meta.ShardTimeRangeInfo, client metaclient.MetaClient) (Shard, error)

func (*DBPTInfo) OpenIndexes

func (dbPT *DBPTInfo) OpenIndexes(opId uint64, rp string) error

func (*DBPTInfo) OpenShards

func (dbPT *DBPTInfo) OpenShards(opId uint64, rp string, durationInfos map[uint64]*meta.ShardDurationInfo, loadStat int, client metaclient.MetaClient) error

func (*DBPTInfo) SetOption

func (dbPT *DBPTInfo) SetOption(opt netstorage.EngineOptions)

func (*DBPTInfo) Shard

func (dbPT *DBPTInfo) Shard(id uint64) Shard

type DataBlockInfo

type DataBlockInfo struct {
	// contains filtered or unexported fields
}

type DownSampleFilesInfo added in v1.0.0

type DownSampleFilesInfo struct {
	Names    []string
	OldFiles [][]string
	NewFiles [][]string
	// contains filtered or unexported fields
}

type EndPointPair

type EndPointPair struct {
	Record  *record.Record
	Ordinal int
}

type Engine

type Engine struct {
	ReadOnly bool

	DBPartitions map[string]map[uint32]*DBPTInfo

	DownSamplePolicies map[string]*meta2.StoreDownSamplePolicy
	// contains filtered or unexported fields
}

func (*Engine) Assign added in v1.0.0

func (e *Engine) Assign(opId uint64, db string, ptId uint32, ver uint64, durationInfos map[uint64]*meta2.ShardDurationInfo, client metaclient.MetaClient) error

func (*Engine) ChangeShardTierToWarm

func (e *Engine) ChangeShardTierToWarm(db string, ptId uint32, shardID uint64) error

func (*Engine) Close

func (e *Engine) Close() error

func (*Engine) CreateDBPT

func (e *Engine) CreateDBPT(db string, pt uint32)

func (*Engine) CreateLogicalPlan

func (e *Engine) CreateLogicalPlan(ctx context.Context, db string, ptId uint32, shardID uint64,
	sources influxql.Sources, schema *executor.QuerySchema) (hybridqp.QueryNode, error)

func (*Engine) CreateShard

func (e *Engine) CreateShard(db, rp string, ptId uint32, shardID uint64, timeRangeInfo *meta2.ShardTimeRangeInfo) error

func (*Engine) DbPTRef

func (e *Engine) DbPTRef(db string, ptId uint32) error

func (*Engine) DbPTUnref

func (e *Engine) DbPTUnref(db string, ptId uint32)

func (*Engine) DeleteDatabase

func (e *Engine) DeleteDatabase(db string, ptId uint32) (err error)

func (*Engine) DeleteIndex

func (e *Engine) DeleteIndex(db string, ptId uint32, indexID uint64) error

func (*Engine) DeleteShard

func (e *Engine) DeleteShard(db string, ptId uint32, shardID uint64) error

todo:need confirm

func (*Engine) DropMeasurement

func (e *Engine) DropMeasurement(db string, rp string, name string, shardIds []uint64) error

func (*Engine) DropRetentionPolicy

func (e *Engine) DropRetentionPolicy(db string, rp string, ptId uint32) error

func (*Engine) DropSeries

func (e *Engine) DropSeries(database string, sources []influxql.Source, ptId []uint32, condition influxql.Expr) (int, error)

func (*Engine) ExpiredIndexes

func (e *Engine) ExpiredIndexes() []*meta2.IndexIdentifier

func (*Engine) ExpiredShards

func (e *Engine) ExpiredShards() []*meta2.ShardIdentifier

func (*Engine) FetchShardsNeedChangeStore

func (e *Engine) FetchShardsNeedChangeStore() (shardsToWarm, shardsToCold []*meta2.ShardIdentifier)

func (*Engine) ForceFlush

func (e *Engine) ForceFlush()

func (*Engine) GetDownSamplePolicy added in v1.0.0

func (e *Engine) GetDownSamplePolicy(key string) *meta2.StoreDownSamplePolicy

func (*Engine) GetLockFile

func (e *Engine) GetLockFile() string

func (*Engine) GetShard added in v1.0.0

func (e *Engine) GetShard(db string, ptId uint32, shardID uint64) Shard

func (*Engine) GetShardDownSampleLevel added in v1.0.0

func (e *Engine) GetShardDownSampleLevel(db string, ptId uint32, shardID uint64) int

func (*Engine) GetShardDownSamplePolicyInfos added in v1.0.0

func (e *Engine) GetShardDownSamplePolicyInfos(meta interface {
	UpdateShardDownSampleInfo(Ident *meta2.ShardIdentifier) error
}) ([]*meta2.ShardDownSamplePolicyInfo, error)

func (*Engine) GetShardSplitPoints

func (e *Engine) GetShardSplitPoints(db string, ptId uint32, shardID uint64, idxes []int64) ([]string, error)

func (*Engine) LogicalPlanCost

func (e *Engine) LogicalPlanCost(db string, ptId uint32, sources influxql.Sources, opt query.ProcessorOptions) (hybridqp.LogicalPlanCost, error)

func (*Engine) Offload added in v1.0.0

func (e *Engine) Offload(db string, ptId uint32) error

func (*Engine) Open

func (e *Engine) Open(durationInfos map[uint64]*meta2.ShardDurationInfo, m meta.MetaClient) error

func (*Engine) PreAssign added in v1.0.0

func (e *Engine) PreAssign(opId uint64, db string, ptId uint32, durationInfos map[uint64]*meta2.ShardDurationInfo, client metaclient.MetaClient) error

func (*Engine) PreOffload added in v1.0.0

func (e *Engine) PreOffload(db string, ptId uint32) error

func (*Engine) RemoveDeadDownSamplePolicy added in v1.0.0

func (e *Engine) RemoveDeadDownSamplePolicy()

func (*Engine) ResetDownSampleFlag added in v1.0.0

func (e *Engine) ResetDownSampleFlag()

func (*Engine) RollbackPreOffload added in v1.0.0

func (e *Engine) RollbackPreOffload(db string, ptId uint32) error

func (*Engine) SeriesCardinality

func (e *Engine) SeriesCardinality(db string, ptIDs []uint32, namesWithVer [][]byte, condition influxql.Expr) ([]meta2.MeasurementCardinalityInfo, error)

func (*Engine) SeriesExactCardinality

func (e *Engine) SeriesExactCardinality(db string, ptIDs []uint32, measurements [][]byte, condition influxql.Expr) (map[string]uint64, error)

func (*Engine) SeriesKeys

func (e *Engine) SeriesKeys(db string, ptIDs []uint32, measurements [][]byte, condition influxql.Expr) ([]string, error)

func (*Engine) SetReadOnly

func (e *Engine) SetReadOnly(readonly bool)

func (*Engine) StartDownSampleTask added in v1.0.0

func (e *Engine) StartDownSampleTask(sdsp *meta2.ShardDownSamplePolicyInfo, schema []hybridqp.Catalog, log *zap.Logger,
	meta interface {
		UpdateShardDownSampleInfo(Ident *meta2.ShardIdentifier) error
	}) error

func (*Engine) Statistics

func (e *Engine) Statistics(buffer []byte) ([]byte, error)

func (*Engine) SysCtrl

func (e *Engine) SysCtrl(req *netstorage.SysCtrlRequest) error

func (*Engine) TagValues

func (e *Engine) TagValues(db string, ptIDs []uint32, tagKeys map[string][][]byte, condition influxql.Expr) (netstorage.TablesTagSets, error)

func (*Engine) TagValuesCardinality

func (e *Engine) TagValuesCardinality(db string, ptIDs []uint32, tagKeys map[string][][]byte, condition influxql.Expr) (map[string]uint64, error)

func (*Engine) UpdateDownSampleInfo added in v1.0.0

func (e *Engine) UpdateDownSampleInfo(policies *meta2.DownSamplePoliciesInfoWithDbRp)

func (*Engine) UpdateShardDownSampleInfo added in v1.0.0

func (e *Engine) UpdateShardDownSampleInfo(infos *meta2.ShardDownSampleUpdateInfos)

func (*Engine) UpdateShardDurationInfo

func (e *Engine) UpdateShardDurationInfo(info *meta2.ShardDurationInfo) error

func (*Engine) UpdateStoreDownSamplePolicies added in v1.0.0

func (e *Engine) UpdateStoreDownSamplePolicies(info *meta2.DownSamplePolicyInfo, ident string)

func (*Engine) WriteRows

func (e *Engine) WriteRows(db, rp string, ptId uint32, shardID uint64, rows []influx.Row, binaryRows []byte) error

type FieldIter added in v1.0.0

type FieldIter struct {
	// contains filtered or unexported fields
}

func NewFieldIter added in v1.0.0

func NewFieldIter(querySchema *executor.QuerySchema) *FieldIter

func (*FieldIter) GetRecordSchemas added in v1.0.0

func (r *FieldIter) GetRecordSchemas() record.Schemas

func (*FieldIter) ResetPos added in v1.0.0

func (r *FieldIter) ResetPos()

type FileSequenceAggregator added in v1.0.0

type FileSequenceAggregator struct {
	executor.BaseProcessor

	Input  *executor.SeriesRecordPort
	Output *executor.SeriesRecordPort
	// contains filtered or unexported fields
}

func (*FileSequenceAggregator) Aggregate added in v1.0.0

func (r *FileSequenceAggregator) Aggregate()

func (*FileSequenceAggregator) AggregateSameSchema added in v1.0.0

func (r *FileSequenceAggregator) AggregateSameSchema() error

func (*FileSequenceAggregator) Close added in v1.0.0

func (r *FileSequenceAggregator) Close()

func (*FileSequenceAggregator) Create added in v1.0.0

func (*FileSequenceAggregator) Explain added in v1.0.0

func (r *FileSequenceAggregator) Explain() []executor.ValuePair

func (*FileSequenceAggregator) GetInputNumber added in v1.0.0

func (r *FileSequenceAggregator) GetInputNumber(executor.Port) int

func (*FileSequenceAggregator) GetInputs added in v1.0.0

func (r *FileSequenceAggregator) GetInputs() executor.Ports

func (*FileSequenceAggregator) GetOutputNumber added in v1.0.0

func (r *FileSequenceAggregator) GetOutputNumber(executor.Port) int

func (*FileSequenceAggregator) GetOutputs added in v1.0.0

func (r *FileSequenceAggregator) GetOutputs() executor.Ports

func (*FileSequenceAggregator) GetProcessors added in v1.0.0

func (r *FileSequenceAggregator) GetProcessors()

func (*FileSequenceAggregator) IsSink added in v1.0.0

func (r *FileSequenceAggregator) IsSink() bool

func (*FileSequenceAggregator) Name added in v1.0.0

func (r *FileSequenceAggregator) Name() string

func (*FileSequenceAggregator) Release added in v1.0.0

func (r *FileSequenceAggregator) Release() error

func (*FileSequenceAggregator) SendRecord added in v1.0.0

func (r *FileSequenceAggregator) SendRecord(re *record.Record)

func (*FileSequenceAggregator) Work added in v1.0.0

type FloatHeapItem

type FloatHeapItem struct {
	// contains filtered or unexported fields
}

func NewFloatHeapItem

func NewFloatHeapItem(n int, cmpByValue, cmpByTime func(a, b *FloatPointItem) bool) *FloatHeapItem

func (*FloatHeapItem) Len

func (f *FloatHeapItem) Len() int

func (*FloatHeapItem) Less

func (f *FloatHeapItem) Less(i, j int) bool

func (*FloatHeapItem) Pop

func (f *FloatHeapItem) Pop() interface{}

func (*FloatHeapItem) Push

func (f *FloatHeapItem) Push(x interface{})

func (*FloatHeapItem) Reset

func (f *FloatHeapItem) Reset()

func (*FloatHeapItem) Swap

func (f *FloatHeapItem) Swap(i, j int)

type FloatPointItem

type FloatPointItem struct {
	// contains filtered or unexported fields
}

func NewFloatPointItem

func NewFloatPointItem(time int64, value float64) *FloatPointItem

type IntegerHeapItem

type IntegerHeapItem struct {
	// contains filtered or unexported fields
}

func NewIntegerHeapItem

func NewIntegerHeapItem(n int, cmpByValue, cmpByTime func(a, b *IntegerPointItem) bool) *IntegerHeapItem

func (*IntegerHeapItem) Len

func (f *IntegerHeapItem) Len() int

func (*IntegerHeapItem) Less

func (f *IntegerHeapItem) Less(i, j int) bool

func (*IntegerHeapItem) Pop

func (f *IntegerHeapItem) Pop() interface{}

func (*IntegerHeapItem) Push

func (f *IntegerHeapItem) Push(x interface{})

func (*IntegerHeapItem) Reset

func (f *IntegerHeapItem) Reset()

func (*IntegerHeapItem) Swap

func (f *IntegerHeapItem) Swap(i, j int)

type IntegerPointItem

type IntegerPointItem struct {
	// contains filtered or unexported fields
}

func NewIntegerPointItem

func NewIntegerPointItem(time int64, value int64) *IntegerPointItem

type LogReplay added in v1.0.1

type LogReplay struct {
	// contains filtered or unexported fields
}

type LogReplays added in v1.0.1

type LogReplays []LogReplay

type LogWriter

type LogWriter struct {
	SyncInterval time.Duration
	// contains filtered or unexported fields
}

func (*LogWriter) Switch

func (w *LogWriter) Switch() ([]string, error)

func (*LogWriter) Write

func (w *LogWriter) Write(compBuf []byte) error

type LogWriters added in v1.0.1

type LogWriters []LogWriter

type MetaIndexIterator added in v1.0.0

type MetaIndexIterator struct {
	// contains filtered or unexported fields
}

func NewMetaIndexIterators added in v1.0.0

func NewMetaIndexIterators(file immutable.TSSPFile, querySchema *executor.QuerySchema) (*MetaIndexIterator, error)

type PreAggTagSetCursor

type PreAggTagSetCursor struct {
	// contains filtered or unexported fields
}

func NewPreAggTagSetCursor

func NewPreAggTagSetCursor(schema *executor.QuerySchema, ctx *idKeyCursorContext, itr comm.KeyCursor) *PreAggTagSetCursor

func (*PreAggTagSetCursor) Close

func (s *PreAggTagSetCursor) Close() error

func (*PreAggTagSetCursor) EndSpan

func (s *PreAggTagSetCursor) EndSpan()

func (*PreAggTagSetCursor) GetSchema

func (s *PreAggTagSetCursor) GetSchema() record.Schemas

func (*PreAggTagSetCursor) Name

func (s *PreAggTagSetCursor) Name() string

func (*PreAggTagSetCursor) Next

func (*PreAggTagSetCursor) NextAggData

func (s *PreAggTagSetCursor) NextAggData() (*record.Record, *comm.FileInfo, error)

func (*PreAggTagSetCursor) RecordInitPreAgg

func (s *PreAggTagSetCursor) RecordInitPreAgg() error

func (*PreAggTagSetCursor) SetOps

func (s *PreAggTagSetCursor) SetOps(ops []*comm.CallOption)

func (*PreAggTagSetCursor) SetSchema

func (s *PreAggTagSetCursor) SetSchema(schema record.Schemas)

func (*PreAggTagSetCursor) SinkPlan

func (s *PreAggTagSetCursor) SinkPlan(plan hybridqp.QueryNode)

func (*PreAggTagSetCursor) StartSpan

func (s *PreAggTagSetCursor) StartSpan(span *tracing.Span)

type PtNNLock

type PtNNLock struct {
}

type Reducer

type Reducer interface {
	Aggregate(*ReducerEndpoint, *ReducerParams)
}

type ReducerEndpoint

type ReducerEndpoint struct {
	InputPoint  EndPointPair
	OutputPoint EndPointPair
}

type ReducerParams

type ReducerParams struct {
	// contains filtered or unexported fields
}

type Routine

type Routine interface {
	WorkOnRecord(*record.Record, *record.Record, *ReducerParams)
}

type RoutineImpl

type RoutineImpl struct {
	// contains filtered or unexported fields
}

func NewRoutineImpl

func NewRoutineImpl(reducer Reducer, inOrdinal int, outOrdinal int) *RoutineImpl

func (*RoutineImpl) WorkOnRecord

func (r *RoutineImpl) WorkOnRecord(in *record.Record, out *record.Record, params *ReducerParams)

type SeriesIter

type SeriesIter struct {
	// contains filtered or unexported fields
}

type Shard

type Shard interface {
	WriteRows(rows []influx.Row, binaryRows []byte) error

	ForceFlush()

	MaxTime() int64

	Count() uint64

	SeriesCount() int

	Close() error

	Ref()

	UnRef()

	CreateLogicalPlan(ctx context.Context, sources influxql.Sources, schema *executor.QuerySchema) (hybridqp.QueryNode, error)

	LogicalPlanCost(sources influxql.Sources, opt query.ProcessorOptions) (hybridqp.LogicalPlanCost, error)

	GetSplitPoints(idxes []int64) ([]string, error)

	Expired() bool

	TierDurationExpired() (tier uint64, expired bool)

	RPName() string

	StartDownSample(taskID uint64, level int, sdsp *meta.ShardDownSamplePolicyInfo, meta interface {
		UpdateShardDownSampleInfo(Ident *meta.ShardIdentifier) error
	}) error

	GetTSSPFiles(mm string, isOrder bool) (*immutable.TSSPFiles, bool)

	GetIndexBuild() *tsi.IndexBuilder

	GetID() uint64

	Open(client metaclient.MetaClient) error

	DataPath() string

	WalPath() string

	TableStore() immutable.TablesStore

	Ident() *meta.ShardIdentifier

	Duration() *meta.DurationDescriptor

	ChangeShardTierToWarm()

	SetWriteColdDuration(duration time.Duration)

	SetMutableSizeLimit(size int64)

	IsOutOfOrderFilesExist() bool

	DropMeasurement(ctx context.Context, name string) error

	Statistics(buffer []byte) ([]byte, error)

	NewShardKeyIdx(shardType, dataPath string, lockPath *string) error

	GetShardDownSamplePolicy(policy *meta.DownSamplePolicyInfo) *meta.ShardDownSamplePolicyInfo

	SetShardDownSampleLevel(i int)
	UpdateDownSampleOnShard(id uint64, level int)
	NewDownSampleTask(sdsp *meta.ShardDownSamplePolicyInfo, schema []hybridqp.Catalog, log *zap.Logger)
	UpdateShardReadOnly(meta interface {
		UpdateShardDownSampleInfo(Ident *meta.ShardIdentifier) error
	}) error
	DisableDownSample()
	EnableDownSample()

	CanDoDownSample() bool

	CompactionEnabled() bool
	DisableCompAndMerge()
	EnableCompAndMerge()
	Compact() error
	WaitWriteFinish()

	SetIndexBuilder(builder *tsi.IndexBuilder)
	CloseIndexBuilder() error
	Scan(span *tracing.Span, schema *executor.QuerySchema, callBack func(num int64) error) (tsi.GroupSeries, int64, error)
	CreateCursor(ctx context.Context, schema *executor.QuerySchema) ([]comm.KeyCursor, error)
	// contains filtered or unexported methods
}

type ShardTaskContent added in v1.0.0

type ShardTaskContent struct {
	// contains filtered or unexported fields
}

type TagSetCursorItem

type TagSetCursorItem struct {
	// contains filtered or unexported fields
}

func (TagSetCursorItem) GetNewRecord

func (c TagSetCursorItem) GetNewRecord() (*record.Record, error)

type TierInfo

type TierInfo struct {
}

type TsspSequenceReader added in v1.0.0

type TsspSequenceReader struct {
	executor.BaseProcessor

	Output *executor.SeriesRecordPort
	// contains filtered or unexported fields
}

func (*TsspSequenceReader) Close added in v1.0.0

func (r *TsspSequenceReader) Close()

func (*TsspSequenceReader) Create added in v1.0.0

func (*TsspSequenceReader) Explain added in v1.0.0

func (r *TsspSequenceReader) Explain() []executor.ValuePair

func (*TsspSequenceReader) GetInputNumber added in v1.0.0

func (r *TsspSequenceReader) GetInputNumber(executor.Port) int

func (*TsspSequenceReader) GetInputs added in v1.0.0

func (r *TsspSequenceReader) GetInputs() executor.Ports

func (*TsspSequenceReader) GetOutputNumber added in v1.0.0

func (r *TsspSequenceReader) GetOutputNumber(executor.Port) int

func (*TsspSequenceReader) GetOutputs added in v1.0.0

func (r *TsspSequenceReader) GetOutputs() executor.Ports

func (*TsspSequenceReader) IsSink added in v1.0.0

func (r *TsspSequenceReader) IsSink() bool

func (*TsspSequenceReader) Name added in v1.0.0

func (r *TsspSequenceReader) Name() string

func (*TsspSequenceReader) Release added in v1.0.0

func (r *TsspSequenceReader) Release() error

func (*TsspSequenceReader) Work added in v1.0.0

func (r *TsspSequenceReader) Work(ctx context.Context) error

type WAL

type WAL struct {
	// contains filtered or unexported fields
}

func NewWAL

func NewWAL(path string, lockPath *string, walSyncInterval time.Duration, walEnabled, replayParallel bool, partitionNum int) *WAL

func (*WAL) Close

func (l *WAL) Close() error

func (*WAL) Remove

func (l *WAL) Remove(files []string) error

func (*WAL) Replay

func (l *WAL) Replay(ctx context.Context, callBack func(binary []byte) error) ([]string, error)

func (*WAL) Switch

func (l *WAL) Switch() ([]string, error)

func (*WAL) Write

func (l *WAL) Write(binary []byte) error

type WalRecordType

type WalRecordType byte
const (
	WriteWALRecord WalRecordType = 0x01
)

type WriteIntoStorageTransform added in v1.0.0

type WriteIntoStorageTransform struct {
	executor.BaseProcessor

	Input  *executor.SeriesRecordPort
	Output *executor.DownSampleStatePort
	// contains filtered or unexported fields
}

func (*WriteIntoStorageTransform) Close added in v1.0.0

func (r *WriteIntoStorageTransform) Close()

func (*WriteIntoStorageTransform) Create added in v1.0.0

func (*WriteIntoStorageTransform) EndFile added in v1.0.0

func (r *WriteIntoStorageTransform) EndFile() error

func (*WriteIntoStorageTransform) Explain added in v1.0.0

func (*WriteIntoStorageTransform) GetClosed added in v1.0.0

func (r *WriteIntoStorageTransform) GetClosed() chan struct{}

func (*WriteIntoStorageTransform) GetInputNumber added in v1.0.0

func (r *WriteIntoStorageTransform) GetInputNumber(executor.Port) int

func (*WriteIntoStorageTransform) GetInputs added in v1.0.0

func (r *WriteIntoStorageTransform) GetInputs() executor.Ports

func (*WriteIntoStorageTransform) GetOutputNumber added in v1.0.0

func (r *WriteIntoStorageTransform) GetOutputNumber(executor.Port) int

func (*WriteIntoStorageTransform) GetOutputs added in v1.0.0

func (r *WriteIntoStorageTransform) GetOutputs() executor.Ports

func (*WriteIntoStorageTransform) GetRowCount added in v1.0.0

func (r *WriteIntoStorageTransform) GetRowCount() int

func (*WriteIntoStorageTransform) Name added in v1.0.0

func (*WriteIntoStorageTransform) Release added in v1.0.0

func (r *WriteIntoStorageTransform) Release() error

func (*WriteIntoStorageTransform) SetTaskId added in v1.0.0

func (r *WriteIntoStorageTransform) SetTaskId(taskID int)

func (*WriteIntoStorageTransform) Work added in v1.0.0

type Writer

type Writer interface {
}

Directories

Path Synopsis
index
ski
tsi

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL