colexec

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 30, 2023 License: Apache-2.0 Imports: 35 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// 'mo_indexes' table
	MO_INDEX_ID               = "id"
	MO_INDEX_TABLE_ID         = "table_id"
	MO_INDEX_DATABASE_ID      = "database_id"
	MO_INDEX_NAME             = "name"
	MO_INDEX_TYPE             = "type"
	MO_INDEX_IS_VISIBLE       = "is_visible"
	MO_INDEX_HIDDEN           = "hidden"
	MO_INDEX_COMMENT          = "comment"
	MO_INDEX_OPTIONS          = "options"
	MO_INDEX_COLUMN_NAME      = "column_name"
	MO_INDEX_ORDINAL_POSITION = "ordinal_position"
	MO_INDEX_TABLE_NAME       = "index_table_name"
	MO_INDEX_PRIKEY           = catalog.CPrimaryKeyColName
)
View Source
const (
	INDEX_TYPE_PRIMARY  = "PRIMARY"
	INDEX_TYPE_UNIQUE   = "UNIQUE"
	INDEX_TYPE_MULTIPLE = "MULTIPLE"
)
View Source
const (
	// WriteS3Threshold when batches'  size of table reaches this, we will
	// trigger write s3
	WriteS3Threshold uint64 = 64 * mpool.MB

	TagS3Size            uint64 = 10 * mpool.MB
	TagS3SizeForMOLogger uint64 = 1 * mpool.MB
)
View Source
const (
	TxnWorkSpaceIdType = 1
	CnBlockIdType      = 2
)
View Source
const (
	ALLOCID_INDEX_KEY = "index_key"
)

Variables

Column type mapping of table 'mo_indexes'

Functions

func BatchDataNotNullCheck added in v0.8.0

func BatchDataNotNullCheck(tmpBat *batch.Batch, tableDef *plan.TableDef, ctx context.Context) error

func EvalExpressionOnce added in v0.8.0

func EvalExpressionOnce(proc *process.Process, planExpr *plan.Expr, batches []*batch.Batch) (*vector.Vector, error)

func EvaluateFilterByZoneMap added in v0.8.0

func EvaluateFilterByZoneMap(
	ctx context.Context,
	proc *process.Process,
	expr *plan.Expr,
	meta objectio.ColumnMetaFetcher,
	columnMap map[int]int,
	zms []objectio.ZoneMap,
	vecs []*vector.Vector) (selected bool)

func FilterRowIdForDel added in v0.8.0

func FilterRowIdForDel(proc *process.Process, bat *batch.Batch, idx int) (*batch.Batch, error)

func FixProjectionResult added in v0.8.0

func FixProjectionResult(proc *process.Process, executors []ExpressionExecutor,
	rbat *batch.Batch, sbat *batch.Batch) (dupSize int, err error)

FixProjectionResult set result vector for rbat. sbat is the source batch.

func GetExprZoneMap added in v0.8.0

func GetExprZoneMap(
	ctx context.Context,
	proc *process.Process,
	expr *plan.Expr,
	meta objectio.ColumnMetaFetcher,
	columnMap map[int]int,
	zms []objectio.ZoneMap,
	vecs []*vector.Vector) (v objectio.ZoneMap)

func GetNewRelation added in v0.6.0

func GetNewRelation(eg engine.Engine, dbName, tbleName string, txn client.TxnOperator, ctx context.Context) (engine.Relation, error)

func GroupByPartitionForDelete added in v0.8.0

func GroupByPartitionForDelete(proc *process.Process, bat *batch.Batch, idx int, pIdx int, partitionNum int) ([]*batch.Batch, error)

GroupByPartitionForDelete: Group data based on partition and return batch array with the same length as the number of partitions. Data from the same partition is placed in the same batch

func GroupByPartitionForInsert added in v0.8.0

func GroupByPartitionForInsert(proc *process.Process, bat *batch.Batch, attrs []string, pIdx int, partitionNum int) ([]*batch.Batch, error)

GroupByPartitionForInsert: Group data based on partition and return batch array with the same length as the number of partitions. Data from the same partition is placed in the same batch

func InsertIndexMetadata added in v0.8.0

func InsertIndexMetadata(eg engine.Engine, ctx context.Context, db engine.Database, proc *process.Process, tblName string) error

InsertIndexMetadata :Synchronize the index metadata information of the table to the index metadata table

func InsertOneIndexMetadata added in v0.8.0

func InsertOneIndexMetadata(eg engine.Engine, ctx context.Context, db engine.Database, proc *process.Process, tblName string, idxdef *plan.IndexDef) error

InsertOneIndexMetadata :Synchronize the single index metadata information into the index metadata table

func NewJoinBatch added in v0.8.0

func NewJoinBatch(bat *batch.Batch, mp *mpool.MPool) (*batch.Batch,
	[]func(*vector.Vector, *vector.Vector, int64, int) error)

func RewriteFilterExprList

func RewriteFilterExprList(list []*plan.Expr) *plan.Expr

RewriteFilterExprList will convert an expression list to be an AndExpr

func SafeGetResult added in v0.8.0

func SafeGetResult(proc *process.Process, vec *vector.Vector, executor ExpressionExecutor) (*vector.Vector, error)

I will remove this function later. do not use this function.

func SetJoinBatchValues added in v0.8.0

func SetJoinBatchValues(joinBat, bat *batch.Batch, sel int64, length int,
	cfs []func(*vector.Vector, *vector.Vector, int64, int) error) error

func SortInFilter added in v0.8.0

func SortInFilter(vec *vector.Vector)

func SplitAndExprs added in v0.6.0

func SplitAndExprs(list []*plan.Expr) []*plan.Expr

Types

type CnSegmentMap added in v0.8.0

type CnSegmentMap struct {
	sync.Mutex
	// contains filtered or unexported fields
}

type ColumnExpressionExecutor added in v0.8.0

type ColumnExpressionExecutor struct {
	// contains filtered or unexported fields
}

func (*ColumnExpressionExecutor) Eval added in v0.8.0

func (expr *ColumnExpressionExecutor) Eval(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*ColumnExpressionExecutor) EvalWithoutResultReusing added in v0.8.0

func (expr *ColumnExpressionExecutor) EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*ColumnExpressionExecutor) Free added in v0.8.0

func (expr *ColumnExpressionExecutor) Free()

func (*ColumnExpressionExecutor) IsColumnExpr added in v0.8.0

func (expr *ColumnExpressionExecutor) IsColumnExpr() bool

type ExpressionExecutor added in v0.8.0

type ExpressionExecutor interface {
	// Eval will return the result vector of expression.
	// the result memory is reused, so it should not be modified or saved.
	// If it needs, it should be copied by vector.Dup().
	Eval(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

	// EvalWithoutResultReusing is the same as Eval, but it will not reuse the memory of result vector.
	// so you can save the result vector directly. but should be careful about memory leak.
	// and watch out that maybe the vector is one of the input vectors of batches.
	EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

	// Free should release all memory of executor.
	// it will be called after query has done.
	Free()

	IsColumnExpr() bool
	// contains filtered or unexported methods
}

ExpressionExecutor generated from plan.Expr, can evaluate the result from vectors directly.

func NewExpressionExecutor added in v0.8.0

func NewExpressionExecutor(proc *process.Process, planExpr *plan.Expr) (ExpressionExecutor, error)

func NewExpressionExecutorsFromPlanExpressions added in v0.8.0

func NewExpressionExecutorsFromPlanExpressions(proc *process.Process, planExprs []*plan.Expr) (executors []ExpressionExecutor, err error)

type FixedVectorExpressionExecutor added in v0.8.0

type FixedVectorExpressionExecutor struct {
	// contains filtered or unexported fields
}

FixedVectorExpressionExecutor the content of its vector is fixed. e.g.

ConstVector [1, 1, 1, 1, 1]
ConstVector [null, null, null]
ListVector  ["1", "2", "3", null, "5"]

func (*FixedVectorExpressionExecutor) Eval added in v0.8.0

func (expr *FixedVectorExpressionExecutor) Eval(_ *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*FixedVectorExpressionExecutor) EvalWithoutResultReusing added in v0.8.0

func (expr *FixedVectorExpressionExecutor) EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*FixedVectorExpressionExecutor) Free added in v0.8.0

func (expr *FixedVectorExpressionExecutor) Free()

func (*FixedVectorExpressionExecutor) IsColumnExpr added in v0.8.0

func (expr *FixedVectorExpressionExecutor) IsColumnExpr() bool

type FunctionExpressionExecutor added in v0.8.0

type FunctionExpressionExecutor struct {
	// contains filtered or unexported fields
}

func (*FunctionExpressionExecutor) Eval added in v0.8.0

func (expr *FunctionExpressionExecutor) Eval(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*FunctionExpressionExecutor) EvalWithoutResultReusing added in v0.8.0

func (expr *FunctionExpressionExecutor) EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*FunctionExpressionExecutor) Free added in v0.8.0

func (expr *FunctionExpressionExecutor) Free()

func (*FunctionExpressionExecutor) Init added in v0.8.0

func (expr *FunctionExpressionExecutor) Init(
	proc *process.Process,
	parameterNum int,
	retType types.Type,
	fn func(
		params []*vector.Vector,
		result vector.FunctionResultWrapper,
		proc *process.Process,
		length int) error) (err error)

func (*FunctionExpressionExecutor) IsColumnExpr added in v0.8.0

func (expr *FunctionExpressionExecutor) IsColumnExpr() bool

func (*FunctionExpressionExecutor) SetParameter added in v0.8.0

func (expr *FunctionExpressionExecutor) SetParameter(index int, executor ExpressionExecutor)

type Merge added in v0.8.0

type Merge[T any] struct {
	// contains filtered or unexported fields
}

we will sort by primary key or clusterby key, so we just need one vector of every batch.

func NewMerge added in v0.8.0

func NewMerge[T any](size int, compLess func([]T, int64, int64) bool, cols [][]T, nulls []*nulls.Nulls) (merge *Merge[T])

func (*Merge[T]) GetNextPos added in v0.8.0

func (merge *Merge[T]) GetNextPos() (batchIndex, rowIndex, size int)

func (*Merge[T]) InitHeap added in v0.8.0

func (merge *Merge[T]) InitHeap()

func (*Merge[T]) Len added in v0.8.0

func (merge *Merge[T]) Len() int

func (*Merge[T]) Less added in v0.8.0

func (merge *Merge[T]) Less(i, j int) bool

type MergeHeap added in v0.8.0

type MergeHeap[T any] struct {
	// contains filtered or unexported fields
}

MergeHeap will take null first rule

func NewMergeHeap added in v0.8.0

func NewMergeHeap[T any](cap_size uint64, cmp func([]T, int64, int64) bool) *MergeHeap[T]

func (*MergeHeap[T]) Pop added in v0.8.0

func (heap *MergeHeap[T]) Pop() (data *MixData[T])

func (*MergeHeap[T]) Push added in v0.8.0

func (heap *MergeHeap[T]) Push(data *MixData[T])

type MergeInterface added in v0.8.0

type MergeInterface interface {
	GetNextPos() (int, int, int)
}

type MixData added in v0.8.0

type MixData[T any] struct {
	// contains filtered or unexported fields
}

type ParamExpressionExecutor added in v0.8.0

type ParamExpressionExecutor struct {
	// contains filtered or unexported fields
}

func (*ParamExpressionExecutor) Eval added in v0.8.0

func (expr *ParamExpressionExecutor) Eval(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*ParamExpressionExecutor) EvalWithoutResultReusing added in v0.8.0

func (expr *ParamExpressionExecutor) EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*ParamExpressionExecutor) Free added in v0.8.0

func (expr *ParamExpressionExecutor) Free()

func (*ParamExpressionExecutor) IsColumnExpr added in v0.8.0

func (expr *ParamExpressionExecutor) IsColumnExpr() bool

type ReceiveInfo added in v0.7.0

type ReceiveInfo struct {
	// it's useless
	NodeAddr string
	Uuid     uuid.UUID
}

ReceiveInfo used to spec which node, and which registers you need

type ReceiverOperator added in v0.8.0

type ReceiverOperator struct {
	// contains filtered or unexported fields
}

ReceiverOperator need to receive batch from proc.Reg.MergeReceivers

func (*ReceiverOperator) FreeAllReg added in v0.8.0

func (r *ReceiverOperator) FreeAllReg()

func (*ReceiverOperator) FreeMergeTypeOperator added in v0.8.0

func (r *ReceiverOperator) FreeMergeTypeOperator(failed bool)

func (*ReceiverOperator) FreeSingleReg added in v0.8.0

func (r *ReceiverOperator) FreeSingleReg(regIdx int)

clean up the batch left in channel

func (*ReceiverOperator) InitReceiver added in v0.8.0

func (r *ReceiverOperator) InitReceiver(proc *process.Process, isMergeType bool)

isMergeType means the receiver operator receive batch from all regs or single by some order Merge/MergeGroup/MergeLimit ... are Merge-Type while Join/Intersect/Minus ... are not

func (*ReceiverOperator) ReceiveFromAllRegs added in v0.8.0

func (r *ReceiverOperator) ReceiveFromAllRegs(analyze process.Analyze) (*batch.Batch, bool, error)

You MUST Init ReceiverOperator with Merge-Type if you want to use this function

func (*ReceiverOperator) ReceiveFromSingleReg added in v0.8.0

func (r *ReceiverOperator) ReceiveFromSingleReg(regIdx int, analyze process.Analyze) (*batch.Batch, bool, error)

type ResultPos added in v0.6.0

type ResultPos struct {
	Rel int32
	Pos int32
}

func NewResultPos added in v0.6.0

func NewResultPos(rel int32, pos int32) ResultPos

type RuntimeFilterChan added in v0.8.0

type RuntimeFilterChan struct {
	Spec *plan.RuntimeFilterSpec
	Chan chan *pipeline.RuntimeFilter
}

type S3Writer added in v0.8.0

type S3Writer struct {

	// Bats[i] used to store the batches of table
	// Each batch in Bats will be sorted internally, and all batches correspond to only one table
	// when the batches' size is over 64M, we will use merge sort, and then write a segment in s3
	Bats []*batch.Batch
	// contains filtered or unexported fields
}

S3Writer is used to write table data to S3 and package a series of `BlockWriter` write operations

func AllocPartitionS3Writer added in v0.8.0

func AllocPartitionS3Writer(proc *process.Process, tableDef *plan.TableDef) ([]*S3Writer, error)

AllocPartitionS3Writer Alloc S3 writers for partitioned table.

func AllocS3Writer added in v0.8.0

func AllocS3Writer(proc *process.Process, tableDef *plan.TableDef) (*S3Writer, error)

func (*S3Writer) Free added in v0.8.0

func (w *S3Writer) Free(proc *process.Process)

func (*S3Writer) GenerateWriter added in v0.8.0

func (w *S3Writer) GenerateWriter(proc *process.Process) (objectio.ObjectName, error)

func (*S3Writer) GetMetaLocBat added in v0.8.0

func (w *S3Writer) GetMetaLocBat() *batch.Batch

func (*S3Writer) Init added in v0.8.0

func (w *S3Writer) Init(proc *process.Process)

func (*S3Writer) InitBuffers added in v0.8.0

func (w *S3Writer) InitBuffers(proc *process.Process, bat *batch.Batch)

func (*S3Writer) Output added in v0.8.0

func (w *S3Writer) Output(proc *process.Process) error

func (*S3Writer) Put added in v0.8.0

func (w *S3Writer) Put(bat *batch.Batch, proc *process.Process) bool

Put batch into w.bats , and make sure that each batch in w.bats

contains options.DefaultBlockMaxRows rows except for the last one.
true: the tableBatches[idx] is over threshold
false: the tableBatches[idx] is less than or equal threshold

func (*S3Writer) ResetMetaLocBat added in v0.8.0

func (w *S3Writer) ResetMetaLocBat(proc *process.Process)

func (*S3Writer) SetMp added in v0.8.0

func (w *S3Writer) SetMp(attrs []*engine.Attribute)

func (*S3Writer) SetSortIdx added in v0.8.0

func (w *S3Writer) SetSortIdx(sortIdx int)

func (*S3Writer) SortAndFlush added in v0.8.0

func (w *S3Writer) SortAndFlush(proc *process.Process) error

func (*S3Writer) WriteBlock added in v0.8.0

func (w *S3Writer) WriteBlock(bat *batch.Batch) error

func (*S3Writer) WriteEndBlocks added in v0.8.0

func (w *S3Writer) WriteEndBlocks(proc *process.Process) ([]catalog.BlockInfo, error)

WriteEndBlocks writes batches in buffer to fileservice(aka s3 in this feature) and get meta data about block on fileservice and put it into metaLocBat For more information, please refer to the comment about func WriteEnd in Writer interface

func (*S3Writer) WriteS3Batch added in v0.8.0

func (w *S3Writer) WriteS3Batch(proc *process.Process, bat *batch.Batch) error

WriteS3Batch logic: S3Writer caches the batches in memory and when the batches size reaches 10M, we add a tag to indicate we need to write these data into s3, but not immediately. We continue to wait until no more data or the data size reaches 64M, at that time we will trigger write s3.

func (*S3Writer) WriteS3CacheBatch added in v0.8.0

func (w *S3Writer) WriteS3CacheBatch(proc *process.Process) error

type Server added in v0.7.0

type Server struct {
	sync.Mutex

	CNSegmentId   types.Uuid
	InitSegmentId bool
	// contains filtered or unexported fields
}

TODO: remove batchCntMap when dispatch executor using the stream correctly Server used to support cn2s3 directly, for more info, refer to docs about it

var Srv *Server

func NewServer added in v0.7.0

func NewServer(client logservice.CNHAKeeperClient) *Server

func (*Server) DeleteTxnSegmentIds added in v0.8.0

func (srv *Server) DeleteTxnSegmentIds(sids []objectio.Segmentid)

func (*Server) GenerateSegment added in v0.7.0

func (srv *Server) GenerateSegment() objectio.ObjectName

SegmentId is part of Id for cn2s3 directly, for more info, refer to docs about it

func (*Server) GetCnSegmentMap added in v0.8.0

func (srv *Server) GetCnSegmentMap() map[string]int32

func (*Server) GetCnSegmentType added in v0.8.0

func (srv *Server) GetCnSegmentType(sid *objectio.Segmentid) int32

func (*Server) GetConnector added in v0.7.0

func (srv *Server) GetConnector(id uint64) *process.WaitRegister

func (*Server) GetNotifyChByUuid added in v0.7.0

func (srv *Server) GetNotifyChByUuid(u uuid.UUID) (*process.Process, bool)

func (*Server) PutCnSegment added in v0.8.0

func (srv *Server) PutCnSegment(sid *objectio.Segmentid, segmentType int32)

func (*Server) PutNotifyChIntoUuidMap added in v0.7.0

func (srv *Server) PutNotifyChIntoUuidMap(u uuid.UUID, p *process.Process) error

func (*Server) RegistConnector added in v0.7.0

func (srv *Server) RegistConnector(reg *process.WaitRegister) uint64

type UuidProcMap added in v0.8.0

type UuidProcMap struct {
	sync.Mutex
	// contains filtered or unexported fields
}

type VarExpressionExecutor added in v0.8.0

type VarExpressionExecutor struct {
	// contains filtered or unexported fields
}

func (*VarExpressionExecutor) Eval added in v0.8.0

func (expr *VarExpressionExecutor) Eval(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*VarExpressionExecutor) EvalWithoutResultReusing added in v0.8.0

func (expr *VarExpressionExecutor) EvalWithoutResultReusing(proc *process.Process, batches []*batch.Batch) (*vector.Vector, error)

func (*VarExpressionExecutor) Free added in v0.8.0

func (expr *VarExpressionExecutor) Free()

func (*VarExpressionExecutor) IsColumnExpr added in v0.8.0

func (expr *VarExpressionExecutor) IsColumnExpr() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL