engine

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2022 License: Apache-2.0 Imports: 52 Imported by: 0

Documentation

Overview

Copyright 2022 Huawei Cloud Computing Technologies Co., Ltd.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Copyright 2022 Huawei Cloud Computing Technologies Co., Ltd.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

View Source
const (
	Failpoint = "failpoint"
	Readonly  = "readonly"
)
View Source
const (
	TierLeveMem           = 1 // in memory
	TierLeveLocalDisk     = 2
	TierLeveObjectStorage = 3
)
View Source
const (
	DefaultFileSize   = 10 * 1024 * 1024
	WALFileSuffixes   = "wal"
	WalRecordHeadSize = 1 + 4
	WalCompBufSize    = 256 * 1024
	WalCompMaxBufSize = 2 * 1024 * 1024
)
View Source
const DataDirectory = "data"
View Source
const IndexFileDirectory = "index"
View Source
const WalDirectory = "wal"

Variables

View Source
var (
	ErrDBNotFound    = fmt.Errorf("database not found")
	ErrPTNotFound    = fmt.Errorf("partition not found")
	ErrRPNotFound    = fmt.Errorf("rp not found")
	ErrShardNotFound = fmt.Errorf("shard not found")
	ErrMMNotFound    = fmt.Errorf("measurment not found")
	ErrShardClosed   = fmt.Errorf("shard closed")
	ErrIndexNotFound = fmt.Errorf("index not found")
	ErrInvalidDir    = fmt.Errorf("shard or index dir not valid")
)
View Source
var (
	AggPool      = record.NewRecordPool()
	SeriesPool   = record.NewRecordPool()
	TsmMergePool = record.NewRecordPool()
)
View Source
var AppendManyNils map[int]func(colVal *record.ColVal, count int)
View Source
var (
	ErrNoSuchParam = fmt.Errorf("no parameter find")
)
View Source
var (
	FileCursorPool = record.NewRecordPool()
)
View Source
var (
	RecordIteratorPool = &sync.Pool{}
)

Functions

func AddLocationsWithFirstTime

func AddLocationsWithFirstTime(l *immutable.LocationCursor, files immutable.TableReaders, ctx *idKeyCursorContext, sid uint64) (int64, error)

func AddLocationsWithInit

func AddLocationsWithInit(l *immutable.LocationCursor, files immutable.TableReaders, ctx *idKeyCursorContext, sid uint64) error

func AddLocationsWithLimit

func AddLocationsWithLimit(l *immutable.LocationCursor, files immutable.TableReaders, ctx *idKeyCursorContext, sid uint64) (int64, error)

func AppendColumnTimes

func AppendColumnTimes(bitmap []bool, column executor.Column, columnTimes []int64, recCol *record.ColVal)

func CanNotAggOnSeriesFunc

func CanNotAggOnSeriesFunc(m map[string]*influxql.Call) bool

func GetMaxTime

func GetMaxTime(maxTime int64, rec *record.Record, isAscending bool) int64

func GetMinTime

func GetMinTime(minTime int64, rec *record.Record, isAscending bool) int64

func MatchPreAgg

func MatchPreAgg(schema *executor.QuerySchema, ctx *idKeyCursorContext) bool

func NewAggregateCursor

func NewAggregateCursor(input comm.KeyCursor, schema *executor.QuerySchema, globalPool *record.RecordPool, hasAuxTags bool) *aggregateCursor

func NewChunkReader

func NewChunkReader(outputRowDataType hybridqp.RowDataType, ops []hybridqp.ExprOptions, seriesPlan hybridqp.QueryNode,
	source influxql.Sources, schema *executor.QuerySchema, cursors []interface{}) executor.Processor

func NewEngine

func NewEngine(dataPath, walPath string, options netstorage.EngineOptions, ctx *meta.LoadCtx) (netstorage.Engine, error)

func NewFileLoopCursor

func NewFileLoopCursor(ctx *idKeyCursorContext, span *tracing.Span, schema *executor.QuerySchema,
	tagSet *tsi.TagSetInfo, start, step int, s *shard) *fileLoopCursor

func NewFloatColFloatHeapReducer

func NewFloatColFloatHeapReducer(inOrdinal, outOrdinal int, auxProcessors []*auxProcessor, floatHeapItem *FloatHeapItem) *floatColFloatHeapReducer

func NewIntegerColIntegerHeapReducer

func NewIntegerColIntegerHeapReducer(inOrdinal, outOrdinal int, auxProcessors []*auxProcessor, integerHeapItem *IntegerHeapItem) *integerColIntegerHeapReducer

func NewLimitCursor

func NewLimitCursor(schema *executor.QuerySchema, helper func(start, end int, src, des *record.Record)) *limitCursor

func NewRecordSchema

func NewRecordSchema(querySchema *executor.QuerySchema, auxTags []string, schema record.Schemas, filterConditions []*influxql.VarRef) ([]string, record.Schemas)

func NewSeriesInfoPool

func NewSeriesInfoPool(num int64) *filesInfoPool

func NewShard

func NewShard(dataPath string, walPath string, ident *meta.ShardIdentifier, indexBuilder *tsi.IndexBuilder, durationInfo *meta.DurationDescriptor, tr *meta.TimeRangeInfo,
	options netstorage.EngineOptions) *shard

func NewTagSetCursorForTest

func NewTagSetCursorForTest(schema *executor.QuerySchema, seriesN int) *tagSetCursor

NewTagSetCursorForTest for ut test, will remove later

func NewTopNLinkedList

func NewTopNLinkedList(n int, ascending bool) *topNLinkedList

func NewTsmMergeCursor

func NewTsmMergeCursor(ctx *idKeyCursorContext, sid uint64, filter influxql.Expr, tags *influx.PointTags, _ *tracing.Span) (*tsmMergeCursor, error)

func RecordCutNormal

func RecordCutNormal(start, end int, src, des *record.Record)

func SetFullCompColdDuration

func SetFullCompColdDuration(d time.Duration)

Types

type AggTagSetCursor

type AggTagSetCursor struct {
	// contains filtered or unexported fields
}

func NewAggTagSetCursor

func NewAggTagSetCursor(schema *executor.QuerySchema, ctx *idKeyCursorContext, itr comm.KeyCursor) *AggTagSetCursor

func (*AggTagSetCursor) Close

func (s *AggTagSetCursor) Close() error

func (*AggTagSetCursor) EndSpan

func (s *AggTagSetCursor) EndSpan()

func (*AggTagSetCursor) GetSchema

func (s *AggTagSetCursor) GetSchema() record.Schemas

func (*AggTagSetCursor) Init

func (s *AggTagSetCursor) Init()

func (*AggTagSetCursor) Name

func (s *AggTagSetCursor) Name() string

func (*AggTagSetCursor) Next

func (*AggTagSetCursor) NextAggData

func (s *AggTagSetCursor) NextAggData() (*record.Record, *comm.FileInfo, error)

func (*AggTagSetCursor) RecordInit

func (s *AggTagSetCursor) RecordInit() error

func (*AggTagSetCursor) SetOps

func (s *AggTagSetCursor) SetOps(ops []*comm.CallOption)

func (*AggTagSetCursor) SetSchema

func (s *AggTagSetCursor) SetSchema(schema record.Schemas)

func (*AggTagSetCursor) SinkPlan

func (s *AggTagSetCursor) SinkPlan(plan hybridqp.QueryNode)

func (*AggTagSetCursor) StartSpan

func (s *AggTagSetCursor) StartSpan(span *tracing.Span)

func (*AggTagSetCursor) TagAuxHandler

func (s *AggTagSetCursor) TagAuxHandler(re *record.Record, start, end int)

func (*AggTagSetCursor) TimeWindowsInit

func (s *AggTagSetCursor) TimeWindowsInit()

func (*AggTagSetCursor) UpdateRec2Window

func (s *AggTagSetCursor) UpdateRec2Window(index int, i int)

type ChunkReader

type ChunkReader struct {
	executor.BaseProcessor

	Output *executor.ChunkPort

	ResultChunkPool *executor.CircularChunkPool
	// contains filtered or unexported fields
}

func (*ChunkReader) Close

func (r *ChunkReader) Close()

func (*ChunkReader) Create

func (*ChunkReader) Explain

func (r *ChunkReader) Explain() []executor.ValuePair

func (*ChunkReader) GetInputNumber

func (r *ChunkReader) GetInputNumber(executor.Port) int

func (*ChunkReader) GetInputs

func (r *ChunkReader) GetInputs() executor.Ports

func (*ChunkReader) GetOutputNumber

func (r *ChunkReader) GetOutputNumber(executor.Port) int

func (*ChunkReader) GetOutputs

func (r *ChunkReader) GetOutputs() executor.Ports

func (*ChunkReader) IsSink

func (r *ChunkReader) IsSink() bool

func (*ChunkReader) Name

func (r *ChunkReader) Name() string

func (*ChunkReader) Release

func (r *ChunkReader) Release() error

func (*ChunkReader) Work

func (r *ChunkReader) Work(ctx context.Context) error

type CoProcessor

type CoProcessor interface {
	WorkOnRecord(*record.Record, *record.Record, *ReducerParams)
}

type CoProcessorImpl

type CoProcessorImpl struct {
	Routines []Routine
}

func NewCoProcessorImpl

func NewCoProcessorImpl(routines ...Routine) *CoProcessorImpl

func (*CoProcessorImpl) AppendRoutine

func (p *CoProcessorImpl) AppendRoutine(routines ...Routine)

func (*CoProcessorImpl) WorkOnRecord

func (p *CoProcessorImpl) WorkOnRecord(in *record.Record, out *record.Record, params *ReducerParams)

type Compactor

type Compactor struct {
	// contains filtered or unexported fields
}

func NewCompactor

func NewCompactor() *Compactor

func (*Compactor) RegisterShard

func (c *Compactor) RegisterShard(sh *shard)

func (*Compactor) SetAllOutOfOrderMergeSwitch

func (c *Compactor) SetAllOutOfOrderMergeSwitch(en bool)

func (*Compactor) SetAllShardsCompactionSwitch

func (c *Compactor) SetAllShardsCompactionSwitch(en bool)

func (*Compactor) SetSnapshotColdDuration

func (c *Compactor) SetSnapshotColdDuration(d time.Duration)

func (*Compactor) ShardCompactionSwitch

func (c *Compactor) ShardCompactionSwitch(shid uint64, en bool)

func (*Compactor) ShardOutOfOrderMergeSwitch

func (c *Compactor) ShardOutOfOrderMergeSwitch(shid uint64, en bool)

func (*Compactor) UnregisterShard

func (c *Compactor) UnregisterShard(shardId uint64)

type DBPTInfo

type DBPTInfo struct {
	// contains filtered or unexported fields
}

func NewDBPTInfo

func NewDBPTInfo(db string, id uint32, dataPath, walPath string, ctx *metaclient.LoadCtx) *DBPTInfo

func (*DBPTInfo) LoadAllShards

func (dbPT *DBPTInfo) LoadAllShards(rp string, shardIDs []uint64) error

func (*DBPTInfo) LockFile

func (dbPT *DBPTInfo) LockFile() string

func (*DBPTInfo) NewShard

func (dbPT *DBPTInfo) NewShard(rp string, shardID uint64, timeRangeInfo *meta.ShardTimeRangeInfo) (Shard, error)

func (*DBPTInfo) OpenIndexes

func (dbPT *DBPTInfo) OpenIndexes(rp string) error

func (*DBPTInfo) OpenShards

func (dbPT *DBPTInfo) OpenShards(rp string, durationInfos map[uint64]*meta.ShardDurationInfo) error

func (*DBPTInfo) SetOption

func (dbPT *DBPTInfo) SetOption(opt netstorage.EngineOptions)

func (*DBPTInfo) Shard

func (dbPT *DBPTInfo) Shard(id uint64) Shard

type DataBlockInfo

type DataBlockInfo struct {
	// contains filtered or unexported fields
}

type EndPointPair

type EndPointPair struct {
	Record  *record.Record
	Ordinal int
}

type Engine

type Engine struct {
	ReadOnly bool

	DBPartitions map[string]map[uint32]*DBPTInfo
	// contains filtered or unexported fields
}

func (*Engine) ChangeShardTierToWarm

func (e *Engine) ChangeShardTierToWarm(db string, ptId uint32, shardID uint64) error

func (*Engine) Close

func (e *Engine) Close() error

func (*Engine) CreateDBPT

func (e *Engine) CreateDBPT(db string, pt uint32)

func (*Engine) CreateLogicalPlan

func (e *Engine) CreateLogicalPlan(ctx context.Context, db string, ptId uint32, shardID uint64,
	sources influxql.Sources, schema *executor.QuerySchema) (hybridqp.QueryNode, error)

func (*Engine) CreateShard

func (e *Engine) CreateShard(db, rp string, ptId uint32, shardID uint64, timeRangeInfo *meta2.ShardTimeRangeInfo) error

func (*Engine) DbPTRef

func (e *Engine) DbPTRef(db string, ptId uint32) error

func (*Engine) DbPTUnref

func (e *Engine) DbPTUnref(db string, ptId uint32)

func (*Engine) DeleteDatabase

func (e *Engine) DeleteDatabase(db string, ptId uint32) (err error)

func (*Engine) DeleteIndex

func (e *Engine) DeleteIndex(db string, ptId uint32, indexID uint64) error

func (*Engine) DeleteShard

func (e *Engine) DeleteShard(db string, ptId uint32, shardID uint64) error

todo:need confirm

func (*Engine) DropMeasurement

func (e *Engine) DropMeasurement(db string, rp string, name string, shardIds []uint64) error

func (*Engine) DropRetentionPolicy

func (e *Engine) DropRetentionPolicy(db string, rp string, ptId uint32) error

func (*Engine) DropSeries

func (e *Engine) DropSeries(database string, sources []influxql.Source, ptId []uint32, condition influxql.Expr) (int, error)

func (*Engine) ExpiredIndexes

func (e *Engine) ExpiredIndexes() []*meta2.IndexIdentifier

func (*Engine) ExpiredShards

func (e *Engine) ExpiredShards() []*meta2.ShardIdentifier

func (*Engine) FetchShardsNeedChangeStore

func (e *Engine) FetchShardsNeedChangeStore() (shardsToWarm, shardsToCold []*meta2.ShardIdentifier)

func (*Engine) ForceFlush

func (e *Engine) ForceFlush()

func (*Engine) GetLockFile

func (e *Engine) GetLockFile() string

func (*Engine) GetShardSplitPoints

func (e *Engine) GetShardSplitPoints(db string, ptId uint32, shardID uint64, idxes []int64) ([]string, error)

func (*Engine) LogicalPlanCost

func (e *Engine) LogicalPlanCost(db string, ptId uint32, sources influxql.Sources, opt query.ProcessorOptions) (hybridqp.LogicalPlanCost, error)

func (*Engine) Open

func (e *Engine) Open(ptIds []uint32, durationInfos map[uint64]*meta2.ShardDurationInfo) error

func (*Engine) SeriesCardinality

func (e *Engine) SeriesCardinality(db string, ptIDs []uint32, measurements [][]byte, condition influxql.Expr) ([]meta2.MeasurementCardinalityInfo, error)

func (*Engine) SeriesExactCardinality

func (e *Engine) SeriesExactCardinality(db string, ptIDs []uint32, measurements [][]byte, condition influxql.Expr) (map[string]uint64, error)

func (*Engine) SeriesKeys

func (e *Engine) SeriesKeys(db string, ptIDs []uint32, measurements [][]byte, condition influxql.Expr) ([]string, error)

func (*Engine) SetReadOnly

func (e *Engine) SetReadOnly(readonly bool)

func (*Engine) Statistics

func (e *Engine) Statistics(buffer []byte) ([]byte, error)

func (*Engine) SysCtrl

func (e *Engine) SysCtrl(req *netstorage.SysCtrlRequest) error

func (*Engine) TagValues

func (e *Engine) TagValues(db string, ptIDs []uint32, tagKeys map[string][][]byte, condition influxql.Expr) (netstorage.TablesTagSets, error)

func (*Engine) TagValuesCardinality

func (e *Engine) TagValuesCardinality(db string, ptIDs []uint32, tagKeys map[string][][]byte, condition influxql.Expr) (map[string]uint64, error)

func (*Engine) UpdateShardDurationInfo

func (e *Engine) UpdateShardDurationInfo(info *meta2.ShardDurationInfo) error

func (*Engine) WriteRows

func (e *Engine) WriteRows(db, rp string, ptId uint32, shardID uint64, rows []influx.Row, binaryRows []byte) error

type FloatHeapItem

type FloatHeapItem struct {
	// contains filtered or unexported fields
}

func NewFloatHeapItem

func NewFloatHeapItem(n int, cmpByValue, cmpByTime func(a, b *FloatPointItem) bool) *FloatHeapItem

func (*FloatHeapItem) Len

func (f *FloatHeapItem) Len() int

func (*FloatHeapItem) Less

func (f *FloatHeapItem) Less(i, j int) bool

func (*FloatHeapItem) Pop

func (f *FloatHeapItem) Pop() interface{}

func (*FloatHeapItem) Push

func (f *FloatHeapItem) Push(x interface{})

func (*FloatHeapItem) Reset

func (f *FloatHeapItem) Reset()

func (*FloatHeapItem) Swap

func (f *FloatHeapItem) Swap(i, j int)

type FloatPointItem

type FloatPointItem struct {
	// contains filtered or unexported fields
}

func NewFloatPointItem

func NewFloatPointItem(time int64, value float64) *FloatPointItem

type IntegerHeapItem

type IntegerHeapItem struct {
	// contains filtered or unexported fields
}

func NewIntegerHeapItem

func NewIntegerHeapItem(n int, cmpByValue, cmpByTime func(a, b *IntegerPointItem) bool) *IntegerHeapItem

func (*IntegerHeapItem) Len

func (f *IntegerHeapItem) Len() int

func (*IntegerHeapItem) Less

func (f *IntegerHeapItem) Less(i, j int) bool

func (*IntegerHeapItem) Pop

func (f *IntegerHeapItem) Pop() interface{}

func (*IntegerHeapItem) Push

func (f *IntegerHeapItem) Push(x interface{})

func (*IntegerHeapItem) Reset

func (f *IntegerHeapItem) Reset()

func (*IntegerHeapItem) Swap

func (f *IntegerHeapItem) Swap(i, j int)

type IntegerPointItem

type IntegerPointItem struct {
	// contains filtered or unexported fields
}

func NewIntegerPointItem

func NewIntegerPointItem(time int64, value int64) *IntegerPointItem

type LogWriter

type LogWriter struct {
	SyncInterval time.Duration
	// contains filtered or unexported fields
}

func (*LogWriter) Switch

func (w *LogWriter) Switch() ([]string, error)

func (*LogWriter) Write

func (w *LogWriter) Write(compBuf []byte) error

type PreAggTagSetCursor

type PreAggTagSetCursor struct {
	// contains filtered or unexported fields
}

func NewPreAggTagSetCursor

func NewPreAggTagSetCursor(schema *executor.QuerySchema, ctx *idKeyCursorContext, itr comm.KeyCursor) *PreAggTagSetCursor

func (*PreAggTagSetCursor) Close

func (s *PreAggTagSetCursor) Close() error

func (*PreAggTagSetCursor) EndSpan

func (s *PreAggTagSetCursor) EndSpan()

func (*PreAggTagSetCursor) GetSchema

func (s *PreAggTagSetCursor) GetSchema() record.Schemas

func (*PreAggTagSetCursor) Name

func (s *PreAggTagSetCursor) Name() string

func (*PreAggTagSetCursor) Next

func (*PreAggTagSetCursor) NextAggData

func (s *PreAggTagSetCursor) NextAggData() (*record.Record, *comm.FileInfo, error)

func (*PreAggTagSetCursor) RecordInitPreAgg

func (s *PreAggTagSetCursor) RecordInitPreAgg() error

func (*PreAggTagSetCursor) SetOps

func (s *PreAggTagSetCursor) SetOps(ops []*comm.CallOption)

func (*PreAggTagSetCursor) SetSchema

func (s *PreAggTagSetCursor) SetSchema(schema record.Schemas)

func (*PreAggTagSetCursor) SinkPlan

func (s *PreAggTagSetCursor) SinkPlan(plan hybridqp.QueryNode)

func (*PreAggTagSetCursor) StartSpan

func (s *PreAggTagSetCursor) StartSpan(span *tracing.Span)

func (*PreAggTagSetCursor) TimeWindowsInit

func (s *PreAggTagSetCursor) TimeWindowsInit()

type PtNNLock

type PtNNLock struct {
}

type Reducer

type Reducer interface {
	Aggregate(*ReducerEndpoint, *ReducerParams)
}

type ReducerEndpoint

type ReducerEndpoint struct {
	InputPoint  EndPointPair
	OutputPoint EndPointPair
}

type ReducerParams

type ReducerParams struct {
	// contains filtered or unexported fields
}

type Routine

type Routine interface {
	WorkOnRecord(*record.Record, *record.Record, *ReducerParams)
}

type RoutineImpl

type RoutineImpl struct {
	// contains filtered or unexported fields
}

func NewRoutineImpl

func NewRoutineImpl(reducer Reducer, inOrdinal int, outOrdinal int) *RoutineImpl

func (*RoutineImpl) WorkOnRecord

func (r *RoutineImpl) WorkOnRecord(in *record.Record, out *record.Record, params *ReducerParams)

type SeriesIter

type SeriesIter struct {
	// contains filtered or unexported fields
}

type Shard

type Shard interface {
	WriteRows(rows []influx.Row, binaryRows []byte) error

	ForceFlush()

	MaxTime() int64

	Count() uint64

	SeriesCount() int

	Close() error

	Ref()

	UnRef()

	CreateLogicalPlan(ctx context.Context, sources influxql.Sources, schema *executor.QuerySchema) (hybridqp.QueryNode, error)

	LogicalPlanCost(sources influxql.Sources, opt query.ProcessorOptions) (hybridqp.LogicalPlanCost, error)

	GetSplitPoints(idxes []int64) ([]string, error)

	Expired() bool

	TierDurationExpired() (tier uint64, expired bool)

	RPName() string

	GetIndexBuild() *tsi.IndexBuilder

	GetID() uint64

	Open() error

	DataPath() string

	WalPath() string

	TableStore() immutable.TablesStore

	Ident() *meta.ShardIdentifier

	Duration() *meta.DurationDescriptor

	ChangeShardTierToWarm()

	SetWriteColdDuration(duration time.Duration)

	SetMutableSizeLimit(size int64)

	DropMeasurement(ctx context.Context, name string) error

	Statistics(buffer []byte) ([]byte, error)

	NewShardKeyIdx(shardType, dataPath string) error
}

type TagSetCursorItem

type TagSetCursorItem struct {
	// contains filtered or unexported fields
}

func (TagSetCursorItem) GetNewRecord

func (c TagSetCursorItem) GetNewRecord() (*record.Record, error)

type TierInfo

type TierInfo struct {
}

type WAL

type WAL struct {
	// contains filtered or unexported fields
}

func NewWAL

func NewWAL(path string, walSyncInterval time.Duration, walEnabled, replayParallel bool, partitionNum int) *WAL

func (*WAL) Close

func (l *WAL) Close() error

func (*WAL) Remove

func (l *WAL) Remove(files []string) error

func (*WAL) Replay

func (l *WAL) Replay(callBack func(binary []byte) error) ([]string, error)

func (*WAL) Switch

func (l *WAL) Switch() ([]string, error)

func (*WAL) Write

func (l *WAL) Write(binary []byte) error

type WalRecordType

type WalRecordType byte
const (
	WriteWALRecord WalRecordType = 0x01
)

type Writer

type Writer interface {
}

Directories

Path Synopsis
index
ski
tsi

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL