Documentation
¶
Index ¶
- Constants
- Variables
- func IsHashJoinV2Supported() bool
- func JoinerType(j Joiner) logicalop.JoinType
- func NewConcurrentMapHashTable() *concurrentMapHashTable
- func NewJoinRuntimeStats() *joinRuntimeStats
- type BaseHashTable
- type BuildWorkerV1
- type BuildWorkerV2
- type HashContext
- type HashJoinCtxV1
- type HashJoinCtxV2
- type HashJoinV1Exec
- type HashJoinV2Exec
- type IndexJoinExecutorBuilder
- type IndexJoinLookUpContent
- type IndexLookUpJoin
- type IndexLookUpMergeJoin
- type IndexNestedLoopHashJoin
- type InnerCtx
- type InnerMergeCtx
- type IterCb
- type Joiner
- type MergeJoinExec
- type MergeJoinTable
- type NAAJType
- type NestedLoopApplyExec
- type OuterCtx
- type OuterMergeCtx
- type ProbeSideTupleFetcherV1
- type ProbeSideTupleFetcherV2
- type ProbeV2
- type ProbeWorkerV1
- type ProbeWorkerV2
- type UpsertCb
Constants ¶
const ( // OneInt64 mean the key contains only one Int64 OneInt64 keyMode = iota // FixedSerializedKey mean the key has fixed length FixedSerializedKey // VariableSerializedKey mean the key has variable length VariableSerializedKey )
Variables ¶
var ( // EnableHashJoinV2 enable hash join v2, used for test EnableHashJoinV2 = "set tidb_hash_join_version = " + joinversion.HashJoinVersionOptimized // DisableHashJoinV2 disable hash join v2, used for test DisableHashJoinV2 = "set tidb_hash_join_version = " + joinversion.HashJoinVersionLegacy // HashJoinV2Strings is used for test HashJoinV2Strings = []string{DisableHashJoinV2, EnableHashJoinV2} )
var IsChildCloseCalledForTest atomic.Bool
IsChildCloseCalledForTest is used for test
var ShardCount = 320
ShardCount controls the shard maps within the concurrent map
Functions ¶
func IsHashJoinV2Supported ¶
func IsHashJoinV2Supported() bool
IsHashJoinV2Supported return true if hash join v2 is supported in current env
func JoinerType ¶
JoinerType returns the join type of a Joiner.
func NewConcurrentMapHashTable ¶
func NewConcurrentMapHashTable() *concurrentMapHashTable
NewConcurrentMapHashTable creates a concurrentMapHashTable
func NewJoinRuntimeStats ¶
func NewJoinRuntimeStats() *joinRuntimeStats
NewJoinRuntimeStats returns a new joinRuntimeStats
Types ¶
type BaseHashTable ¶
type BaseHashTable interface {
Put(hashKey uint64, rowPtr chunk.RowPtr)
// e := Get(hashKey)
// for ; e != nil; e = e.Next {
// rowPtr := e.Ptr
// ...
// }
Get(hashKey uint64) *entry
Len() uint64
// GetAndCleanMemoryDelta gets and cleans the memDelta of the BaseHashTable. Memory delta will be cleared after each fetch.
// It indicates the memory delta of the BaseHashTable since the last calling GetAndCleanMemoryDelta().
GetAndCleanMemoryDelta() int64
Iter(func(uint64, *entry))
}
BaseHashTable is the interface of the hash table used in hash join
type BuildWorkerV1 ¶
type BuildWorkerV1 struct {
HashJoinCtx *HashJoinCtxV1
BuildNAKeyColIdx []int
// contains filtered or unexported fields
}
BuildWorkerV1 is the build side worker in hash join
func (*BuildWorkerV1) BuildHashTableForList ¶
func (w *BuildWorkerV1) BuildHashTableForList(buildSideResultCh <-chan *chunk.Chunk) error
BuildHashTableForList builds hash table from `list`.
type BuildWorkerV2 ¶
type BuildWorkerV2 struct {
HashJoinCtx *HashJoinCtxV2
BuildTypes []*types.FieldType
HasNullableKey bool
WorkerID uint
// contains filtered or unexported fields
}
BuildWorkerV2 is the build worker used in hash join v2
func NewJoinBuildWorkerV2 ¶
func NewJoinBuildWorkerV2(ctx *HashJoinCtxV2, workID uint, buildSideExec exec.Executor, buildKeyColIdx []int, buildTypes []*types.FieldType) *BuildWorkerV2
NewJoinBuildWorkerV2 create a BuildWorkerV2
type HashContext ¶
type HashContext struct {
// AllTypes one-to-one correspondence with KeyColIdx
AllTypes []*types.FieldType
KeyColIdx []int
NaKeyColIdx []int
Buf []byte
HashVals []hash.Hash64
HasNull []bool
// contains filtered or unexported fields
}
HashContext keeps the needed hash context of a db table in hash join.
type HashJoinCtxV1 ¶
type HashJoinCtxV1 struct {
UseOuterToBuild bool
IsOuterJoin bool
RowContainer *hashRowContainer
ProbeTypes []*types.FieldType
BuildTypes []*types.FieldType
OuterFilter expression.CNFExprs
// contains filtered or unexported fields
}
HashJoinCtxV1 is the context used in hash join
type HashJoinCtxV2 ¶
type HashJoinCtxV2 struct {
ProbeKeyTypes []*types.FieldType
BuildKeyTypes []*types.FieldType
RightAsBuildSide bool
BuildFilter expression.CNFExprs
ProbeFilter expression.CNFExprs
OtherCondition expression.CNFExprs
LUsed, RUsed []int
LUsedInOtherCondition, RUsedInOtherCondition []int
// contains filtered or unexported fields
}
HashJoinCtxV2 is the hash join ctx used in hash join v2
func (*HashJoinCtxV2) SetupPartitionInfo ¶
func (hCtx *HashJoinCtxV2) SetupPartitionInfo()
SetupPartitionInfo set up partitionNumber and partitionMaskOffset based on concurrency
type HashJoinV1Exec ¶
type HashJoinV1Exec struct {
exec.BaseExecutor
*HashJoinCtxV1
ProbeSideTupleFetcher *ProbeSideTupleFetcherV1
ProbeWorkers []*ProbeWorkerV1
BuildWorker *BuildWorkerV1
Prepared bool
// contains filtered or unexported fields
}
HashJoinV1Exec implements the hash join algorithm.
func (*HashJoinV1Exec) Close ¶
func (e *HashJoinV1Exec) Close() error
Close implements the Executor Close interface.
func (*HashJoinV1Exec) Next ¶
Next implements the Executor Next interface. hash join constructs the result following these steps: step 1. fetch data from build side child and build a hash table; step 2. fetch data from probe child in a background goroutine and probe the hash table in multiple join workers.
type HashJoinV2Exec ¶
type HashJoinV2Exec struct {
exec.BaseExecutor
*HashJoinCtxV2
ProbeSideTupleFetcher *ProbeSideTupleFetcherV2
ProbeWorkers []*ProbeWorkerV2
BuildWorkers []*BuildWorkerV2
// contains filtered or unexported fields
}
HashJoinV2Exec implements the hash join algorithm.
func (*HashJoinV2Exec) Close ¶
func (e *HashJoinV2Exec) Close() error
Close implements the Executor Close interface.
func (*HashJoinV2Exec) Next ¶
Next implements the Executor Next interface. hash join constructs the result following these steps: step 1. fetch data from build side child and build a hash table; step 2. fetch data from probe child in a background goroutine and probe the hash table in multiple join workers.
type IndexJoinExecutorBuilder ¶
type IndexJoinExecutorBuilder interface {
BuildExecutorForIndexJoin(ctx context.Context, lookUpContents []*IndexJoinLookUpContent,
indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool, memTracker *memory.Tracker, interruptSignal *atomic.Value) (exec.Executor, error)
}
IndexJoinExecutorBuilder is the interface used by index lookup join to build the executor, this interface is added to avoid cycle import
type IndexJoinLookUpContent ¶
type IndexJoinLookUpContent struct {
Keys []types.Datum
Row chunk.Row
KeyColIDs []int64 // the original ID in its table, used by dynamic partition pruning
// contains filtered or unexported fields
}
IndexJoinLookUpContent is the content used in index lookup join
type IndexLookUpJoin ¶
type IndexLookUpJoin struct {
exec.BaseExecutor
WorkerWg *sync.WaitGroup
OuterCtx OuterCtx
InnerCtx InnerCtx
JoinResult *chunk.Chunk
Joiner Joiner
IsOuterJoin bool
IndexRanges ranger.MutableRanges
KeyOff2IdxOff []int
// LastColHelper store the information for last col if there's complicated filter like col > x_col and col < x_col + 100.
LastColHelper *plannercore.ColWithCmpFuncManager
Finished *atomic.Value
// contains filtered or unexported fields
}
IndexLookUpJoin employs one outer worker and N innerWorkers to execute concurrently. It preserves the order of the outer table and support batch lookup.
The execution flow is very similar to IndexLookUpReader: 1. outerWorker read N outer rows, build a task and send it to result channel and inner worker channel. 2. The innerWorker receives the task, builds key ranges from outer rows and fetch inner rows, builds inner row hash map. 3. main thread receives the task, waits for inner worker finish handling the task. 4. main thread join each outer row by look up the inner rows hash map in the task.
func (*IndexLookUpJoin) Close ¶
func (e *IndexLookUpJoin) Close() error
Close implements the Executor interface.
type IndexLookUpMergeJoin ¶
type IndexLookUpMergeJoin struct {
exec.BaseExecutor
WorkerWg *sync.WaitGroup
OuterMergeCtx OuterMergeCtx
InnerMergeCtx InnerMergeCtx
Joiners []Joiner
IsOuterJoin bool
IndexRanges ranger.MutableRanges
KeyOff2IdxOff []int
// LastColHelper store the information for last col if there's complicated filter like col > x_col and col < x_col + 100.
LastColHelper *plannercore.ColWithCmpFuncManager
// contains filtered or unexported fields
}
IndexLookUpMergeJoin realizes IndexLookUpJoin by merge join It preserves the order of the outer table and support batch lookup.
The execution flow is very similar to IndexLookUpReader: 1. outerWorker read N outer rows, build a task and send it to result channel and inner worker channel. 2. The innerWorker receives the task, builds key ranges from outer rows and fetch inner rows, then do merge join. 3. main thread receives the task and fetch results from the channel in task one by one. 4. If channel has been closed, main thread receives the next task.
func (*IndexLookUpMergeJoin) Close ¶
func (e *IndexLookUpMergeJoin) Close() error
Close implements the Executor interface.
type IndexNestedLoopHashJoin ¶
type IndexNestedLoopHashJoin struct {
IndexLookUpJoin
// We build individual joiner for each inner worker when using chunk-based
// execution, to avoid the concurrency of joiner.chk and joiner.selected.
Joiners []Joiner
KeepOuterOrder bool
// contains filtered or unexported fields
}
IndexNestedLoopHashJoin employs one outer worker and N inner workers to execute concurrently. The output order is not promised.
The execution flow is very similar to IndexLookUpReader: 1. The outer worker reads N outer rows, builds a task and sends it to the inner worker channel. 2. The inner worker receives the tasks and does 3 things for every task:
- builds hash table from the outer rows
- builds key ranges from outer rows and fetches inner rows
- probes the hash table and sends the join result to the main thread channel. Note: step 1 and step 2 runs concurrently.
3. The main thread receives the join results.
func (*IndexNestedLoopHashJoin) Close ¶
func (e *IndexNestedLoopHashJoin) Close() error
Close implements the IndexNestedLoopHashJoin Executor interface.
type InnerCtx ¶
type InnerCtx struct {
ReaderBuilder IndexJoinExecutorBuilder
RowTypes []*types.FieldType
KeyCols []int
KeyColIDs []int64 // the original ID in its table, used by dynamic partition pruning
KeyCollators []collate.Collator
HashTypes []*types.FieldType
HashCols []int
HashCollators []collate.Collator
ColLens []int
HasPrefixCol bool
}
InnerCtx is the inner side ctx used in index lookup join
type InnerMergeCtx ¶
type InnerMergeCtx struct {
ReaderBuilder IndexJoinExecutorBuilder
RowTypes []*types.FieldType
JoinKeys []*expression.Column
KeyCols []int
KeyCollators []collate.Collator
CompareFuncs []expression.CompareFunc
ColLens []int
Desc bool
KeyOff2KeyOffOrderByIdx []int
}
InnerMergeCtx is the inner side ctx of merge join
type IterCb ¶
type IterCb func(key uint64, e *entry)
IterCb :Iterator callback,called for every key,value found in maps. RLock is held for all calls for a given shard therefore callback sess consistent view of a shard, but not across the shards
type Joiner ¶
type Joiner interface {
// TryToMatchInners tries to join an outer row with a batch of inner rows. When
// 'inners.Len != 0' but all the joined rows are filtered, the outer row is
// considered unmatched. Otherwise, the outer row is matched and some joined
// rows are appended to `chk`. The size of `chk` is limited to MaxChunkSize.
// Note that when the outer row is considered unmatched, we need to differentiate
// whether the join conditions return null or false, because that matters for
// AntiSemiJoin/LeftOuterSemiJoin/AntiLeftOuterSemiJoin, by setting the return
// value isNull; for other join types, isNull is always false.
//
// NOTE: Callers need to call this function multiple times to consume all
// the inner rows for an outer row, and decide whether the outer row can be
// matched with at lease one inner row.
TryToMatchInners(outer chunk.Row, inners chunk.Iterator, chk *chunk.Chunk, opt ...NAAJType) (matched bool, isNull bool, err error)
// TryToMatchOuters tries to join a batch of outer rows with one inner row.
// It's used when the join is an outer join and the hash table is built
// using the outer side.
TryToMatchOuters(outer chunk.Iterator, inner chunk.Row, chk *chunk.Chunk, outerRowStatus []outerRowStatusFlag) (_ []outerRowStatusFlag, err error)
// OnMissMatch operates on the unmatched outer row according to the join
// type. An outer row can be considered miss matched if:
// 1. it can not pass the filter on the outer table side.
// 2. there is no inner row with the same join key.
// 3. all the joined rows can not pass the filter on the join result.
//
// On these conditions, the caller calls this function to handle the
// unmatched outer rows according to the current join type:
// 1. 'SemiJoin': ignores the unmatched outer row.
// 2. 'AntiSemiJoin': appends the unmatched outer row to the result buffer.
// 3. 'LeftOuterSemiJoin': concats the unmatched outer row with 0 and
// appends it to the result buffer.
// 4. 'AntiLeftOuterSemiJoin': concats the unmatched outer row with 1 and
// appends it to the result buffer.
// 5. 'LeftOuterJoin': concats the unmatched outer row with a row of NULLs
// and appends it to the result buffer.
// 6. 'RightOuterJoin': concats the unmatched outer row with a row of NULLs
// and appends it to the result buffer.
// 7. 'InnerJoin': ignores the unmatched outer row.
//
// Note that, for LeftOuterSemiJoin, AntiSemiJoin and AntiLeftOuterSemiJoin,
// we need to know the reason of outer row being treated as unmatched:
// whether the join condition returns false, or returns null, because
// it decides if this outer row should be outputted, hence we have a `hasNull`
// parameter passed to `OnMissMatch`.
OnMissMatch(hasNull bool, outer chunk.Row, chk *chunk.Chunk)
// Clone deep copies a Joiner.
Clone() Joiner
// contains filtered or unexported methods
}
Joiner is used to generate join results according to the join type. A typical instruction flow is:
hasMatch, HasNull := false, false
for innerIter.Current() != innerIter.End() {
matched, isNull, err := j.TryToMatchInners(Outer, innerIter, chk)
// handle err
hasMatch = hasMatch || matched
HasNull = HasNull || isNull
}
if !hasMatch {
j.OnMissMatch(HasNull, Outer, chk)
}
NOTE: This interface is **not** thread-safe. TODO: unit test for all join type
- no filter, no inline projection
- no filter, inline projection
- no filter, inline projection to empty column
- filter, no inline projection
- filter, inline projection
- filter, inline projection to empty column
type MergeJoinExec ¶
type MergeJoinExec struct {
exec.BaseExecutor
StmtCtx *stmtctx.StatementContext
CompareFuncs []expression.CompareFunc
Joiner Joiner
IsOuterJoin bool
Desc bool
InnerTable *MergeJoinTable
OuterTable *MergeJoinTable
// contains filtered or unexported fields
}
MergeJoinExec implements the merge join algorithm. This operator assumes that two iterators of both sides will provide required order on join condition: 1. For equal-join, one of the join key from each side matches the order given. 2. For other cases its preferred not to use SMJ and operator will throw error.
func (*MergeJoinExec) Close ¶
func (e *MergeJoinExec) Close() error
Close implements the Executor Close interface.
type MergeJoinTable ¶
type MergeJoinTable struct {
IsInner bool
ChildIndex int
JoinKeys []*expression.Column
Filters []expression.Expression
// contains filtered or unexported fields
}
MergeJoinTable is used for merge join
type NAAJType ¶
type NAAJType byte
NAAJType is join detail type only used by null-aware AntiLeftOuterSemiJoin.
const ( // Unknown for those default value. Unknown NAAJType = 0 // LeftHasNullRightNotNull means lhs is a null key, and rhs is not a null key. LeftHasNullRightNotNull NAAJType = 1 // LeftHasNullRightHasNull means lhs is a null key, and rhs is a null key. LeftHasNullRightHasNull NAAJType = 2 // LeftNotNullRightNotNull means lhs is in not a null key, and rhs is not a null key. LeftNotNullRightNotNull NAAJType = 3 // LeftNotNullRightHasNull means lhs is in not a null key, and rhs is a null key. LeftNotNullRightHasNull NAAJType = 4 )
type NestedLoopApplyExec ¶
type NestedLoopApplyExec struct {
exec.BaseExecutor
Sctx sessionctx.Context
InnerExec exec.Executor
OuterExec exec.Executor
InnerFilter expression.CNFExprs
OuterFilter expression.CNFExprs
Joiner Joiner
CanUseCache bool
OuterSchema []*expression.CorrelatedColumn
OuterChunk *chunk.Chunk
InnerList *chunk.List
InnerChunk *chunk.Chunk
Outer bool
// contains filtered or unexported fields
}
NestedLoopApplyExec is the executor for apply.
func (*NestedLoopApplyExec) Close ¶
func (e *NestedLoopApplyExec) Close() error
Close implements the Executor interface.
type OuterCtx ¶
type OuterCtx struct {
RowTypes []*types.FieldType
KeyCols []int
HashTypes []*types.FieldType
HashCols []int
Filter expression.CNFExprs
}
OuterCtx is the outer ctx used in index lookup join
type OuterMergeCtx ¶
type OuterMergeCtx struct {
RowTypes []*types.FieldType
JoinKeys []*expression.Column
KeyCols []int
Filter expression.CNFExprs
NeedOuterSort bool
CompareFuncs []expression.CompareFunc
}
OuterMergeCtx is the outer side ctx of merge join
type ProbeSideTupleFetcherV1 ¶
type ProbeSideTupleFetcherV1 struct {
*HashJoinCtxV1
// contains filtered or unexported fields
}
ProbeSideTupleFetcherV1 reads tuples from ProbeSideExec and send them to ProbeWorkers.
type ProbeSideTupleFetcherV2 ¶
type ProbeSideTupleFetcherV2 struct {
*HashJoinCtxV2
// contains filtered or unexported fields
}
ProbeSideTupleFetcherV2 reads tuples from ProbeSideExec and send them to ProbeWorkers.
type ProbeV2 ¶
type ProbeV2 interface {
// SetChunkForProbe will do some pre-work when start probing a chunk
SetChunkForProbe(chunk *chunk.Chunk) error
// SetRestoredChunkForProbe will do some pre-work for a chunk resoted from disk
SetRestoredChunkForProbe(chunk *chunk.Chunk) error
// Probe is to probe current chunk, the result chunk is set in result.chk, and Probe need to make sure result.chk.NumRows() <= result.chk.RequiredRows()
Probe(joinResult *hashjoinWorkerResult, sqlKiller *sqlkiller.SQLKiller) (ok bool, result *hashjoinWorkerResult)
// IsCurrentChunkProbeDone returns true if current probe chunk is all probed
IsCurrentChunkProbeDone() bool
// SpillRemainingProbeChunks spills remaining probe chunks
SpillRemainingProbeChunks() error
// ScanRowTable is called after all the probe chunks are probed. It is used in some special joins, like left outer join with left side to build, after all
// the probe side chunks are handled, it needs to scan the row table to return the un-matched rows
ScanRowTable(joinResult *hashjoinWorkerResult, sqlKiller *sqlkiller.SQLKiller) (result *hashjoinWorkerResult)
// IsScanRowTableDone returns true after scan row table is done
IsScanRowTableDone() bool
// NeedScanRowTable returns true if current join need to scan row table after all the probe side chunks are handled
NeedScanRowTable() bool
// InitForScanRowTable do some pre-work before ScanRowTable, it must be called before ScanRowTable
InitForScanRowTable()
// Return probe collsion
GetProbeCollision() uint64
// Reset probe collsion
ResetProbeCollision()
// Reset some probe variables
ResetProbe()
}
ProbeV2 is the interface used to do probe in hash join v2
type ProbeWorkerV1 ¶
type ProbeWorkerV1 struct {
HashJoinCtx *HashJoinCtxV1
ProbeKeyColIdx []int
ProbeNAKeyColIdx []int
// We build individual joiner for each join worker when use chunk-based
// execution, to avoid the concurrency of joiner.chk and joiner.selected.
Joiner Joiner
// contains filtered or unexported fields
}
ProbeWorkerV1 is the probe side worker in hash join
type ProbeWorkerV2 ¶
type ProbeWorkerV2 struct {
HashJoinCtx *HashJoinCtxV2
// We build individual joinProbe for each join worker when use chunk-based
// execution, to avoid the concurrency of joiner.chk and joiner.selected.
JoinProbe ProbeV2
// contains filtered or unexported fields
}
ProbeWorkerV2 is the probe worker used in hash join v2
type UpsertCb ¶
type UpsertCb func(exist bool, valueInMap, newValue *entry) *entry
UpsertCb : Callback to return new element to be inserted into the map It is called while lock is held, therefore it MUST NOT try to access other keys in same map, as it can lead to deadlock since Go sync.RWLock is not reentrant
Source Files
¶
- base_join_probe.go
- concurrent_map.go
- hash_join_base.go
- hash_join_spill.go
- hash_join_spill_helper.go
- hash_join_test_util.go
- hash_join_v1.go
- hash_join_v2.go
- hash_table_v1.go
- hash_table_v2.go
- index_lookup_hash_join.go
- index_lookup_join.go
- index_lookup_merge_join.go
- inner_join_probe.go
- join_row_table.go
- join_table_meta.go
- joiner.go
- merge_join.go
- outer_join_probe.go
- row_table_builder.go
- tagged_ptr.go