Documentation
¶
Overview ¶
Package table contains some common utilities for working with tables such as a 'Chunker' feature.
Index ¶
- Constants
- Variables
- func LazyFindP90(a []time.Duration) time.Duration
- func QuoteColumns(cols []string) string
- type Boundary
- type Chunk
- type Chunker
- type Datum
- func (d Datum) Add(addVal uint64) Datum
- func (d Datum) GreaterThan(d2 Datum) bool
- func (d Datum) GreaterThanOrEqual(d2 Datum) bool
- func (d Datum) IsBinaryString() bool
- func (d Datum) IsNil() bool
- func (d Datum) IsNumeric() bool
- func (d Datum) MaxValue() Datum
- func (d Datum) MinValue() Datum
- func (d Datum) Range(d2 Datum) uint64
- func (d Datum) String() string
- type FeedbackCall
- type JSONBoundary
- type JSONChunk
- type MockChunker
- func (m *MockChunker) Close() error
- func (m *MockChunker) Feedback(chunk *Chunk, duration time.Duration, actualRows uint64)
- func (m *MockChunker) GetCallCounts() (next, progress int)
- func (m *MockChunker) GetFeedbackCalls() []FeedbackCall
- func (m *MockChunker) GetLowWatermark() (string, error)
- func (m *MockChunker) IsRead() bool
- func (m *MockChunker) KeyAboveHighWatermark(key any) bool
- func (m *MockChunker) KeyBelowLowWatermark(key any) bool
- func (m *MockChunker) MarkAsComplete()
- func (m *MockChunker) Next() (*Chunk, error)
- func (m *MockChunker) Open() error
- func (m *MockChunker) OpenAtWatermark(watermark string) error
- func (m *MockChunker) Progress() (uint64, uint64, uint64)
- func (m *MockChunker) Reset() error
- func (m *MockChunker) SetChunkSize(size uint64)
- func (m *MockChunker) SetCloseError(err error)
- func (m *MockChunker) SetNextError(err error)
- func (m *MockChunker) SetOpenError(err error)
- func (m *MockChunker) SetWatermarkError(err error)
- func (m *MockChunker) SimulateProgress(percentage float64)
- func (m *MockChunker) Tables() []*TableInfo
- type Operator
- type TableInfo
- func (t *TableInfo) AutoUpdateStatistics(ctx context.Context, interval time.Duration, logger loggers.Advanced)
- func (t *TableInfo) Close() error
- func (t *TableInfo) DescIndex(keyName string) ([]string, error)
- func (t *TableInfo) GetColumnMySQLType(col string) (string, bool)
- func (t *TableInfo) MaxValue() Datum
- func (t *TableInfo) PrimaryKeyIsMemoryComparable() error
- func (t *TableInfo) PrimaryKeyValues(row any) ([]any, error)
- func (t *TableInfo) SetInfo(ctx context.Context) error
- func (t *TableInfo) WrapCastType(col string) string
Constants ¶
const ( // StartingChunkSize is the initial chunkSize StartingChunkSize = 1000 // MaxDynamicStepFactor is the maximum amount each recalculation of the dynamic chunkSize can // increase by. For example, if the newTarget is 5000 but the current target is 1000, the newTarget // will be capped back down to 1500. Over time the number 5000 will be reached, but not straight away. MaxDynamicStepFactor = 1.5 // MinDynamicRowSize is the minimum chunkSize that can be used when dynamic chunkSize is enabled. // This helps prevent a scenario where the chunk size is too small (it can never be less than 1). MinDynamicRowSize = 10 // MaxDynamicRowSize is the max allowed chunkSize that can be used when dynamic chunkSize is enabled. // This seems like a safe upper bound for now MaxDynamicRowSize = 100000 // DynamicPanicFactor is the factor by which the feedback process takes immediate action when // the chunkSize appears to be too large. For example, if the PanicFactor is 5, and the target *time* // is 50ms, an actual time 250ms+ will cause the dynamic chunk size to immediately be reduced. DynamicPanicFactor = 5 // ChunkerDefaultTarget is the default chunker target ChunkerDefaultTarget = 100 * time.Millisecond )
Variables ¶
Functions ¶
func LazyFindP90 ¶
LazyFindP90 finds the second to last value in a slice. This is the same as a p90 if there are 10 values, but if there were 100 values it would technically be a p99 etc.
func QuoteColumns ¶
Types ¶
type Chunk ¶
type Chunk struct {
Key []string
ChunkSize uint64
LowerBound *Boundary
UpperBound *Boundary
AdditionalConditions string
Table *TableInfo // Source table information for this chunk
NewTable *TableInfo // Destination table information for this chunk
}
Chunk is returned by chunk.Next() Applications can use it to iterate over the rows.
type Chunker ¶
type Chunker interface {
Open() error
IsRead() bool
Close() error
Next() (*Chunk, error)
Feedback(chunk *Chunk, duration time.Duration, actualRows uint64)
KeyAboveHighWatermark(key0 any) bool
KeyBelowLowWatermark(key0 any) bool
Progress() (rowsRead uint64, chunksCopied uint64, totalRowsExpected uint64)
OpenAtWatermark(watermark string) error
GetLowWatermark() (watermark string, err error)
// Reset resets the chunker to start from the beginning, as if Open() was just called.
// This is used when retrying operations like checksums.
Reset() error
// Tables return a list of table names
// By convention the first table is the "current" table,
// and the second table (if any) is the "new" table.
// There could be more than 2 tables in the case of multi-chunker.
// In which case every second table is the "new" table, etc.
Tables() []*TableInfo
}
func NewChunker ¶
func NewCompositeChunker ¶
func NewCompositeChunker(t *TableInfo, chunkerTarget time.Duration, logger loggers.Advanced, keyName string, whereCondition string) (Chunker, error)
NewCompositeChunker returns a chunkerComposite , setting its Key if keyName and where conditions are provided
func NewMultiChunker ¶
NewMultiChunker creates a new multi-chunker that wraps multiple chunkers
type Datum ¶
type Datum struct {
Val any
Tp datumTp // signed, unsigned, binary
}
Datum could be a binary string, uint64 or int64.
func NewNilDatum ¶
func NewNilDatum(tp datumTp) Datum
func (Datum) GreaterThan ¶
func (Datum) GreaterThanOrEqual ¶
func (Datum) IsBinaryString ¶
type FeedbackCall ¶
type JSONBoundary ¶
type JSONChunk ¶
type JSONChunk struct {
Key []string
ChunkSize uint64
LowerBound JSONBoundary
UpperBound JSONBoundary
}
type MockChunker ¶
type MockChunker struct {
// contains filtered or unexported fields
}
MockChunker provides a controllable chunker for testing multi-chunker behavior
func NewMockChunker ¶
func NewMockChunker(tableName string, totalRows uint64) *MockChunker
NewMockChunker creates a new mock chunker for testing
func (*MockChunker) Close ¶
func (m *MockChunker) Close() error
func (*MockChunker) Feedback ¶
func (m *MockChunker) Feedback(chunk *Chunk, duration time.Duration, actualRows uint64)
func (*MockChunker) GetCallCounts ¶
func (m *MockChunker) GetCallCounts() (next, progress int)
func (*MockChunker) GetFeedbackCalls ¶
func (m *MockChunker) GetFeedbackCalls() []FeedbackCall
func (*MockChunker) GetLowWatermark ¶
func (m *MockChunker) GetLowWatermark() (string, error)
func (*MockChunker) IsRead ¶
func (m *MockChunker) IsRead() bool
func (*MockChunker) KeyAboveHighWatermark ¶
func (m *MockChunker) KeyAboveHighWatermark(key any) bool
KeyAboveHighWatermark returns true if the given key is above the current watermark It returns FALSE in cases that are difficult to determine (e.g. non-numeric keys)
func (*MockChunker) KeyBelowLowWatermark ¶
func (m *MockChunker) KeyBelowLowWatermark(key any) bool
KeyBelowLowWatermark returns true if the given key is below the current low watermark It returns TRUE in cases that are difficult to determine (e.g. non-numeric keys)
func (*MockChunker) MarkAsComplete ¶
func (m *MockChunker) MarkAsComplete()
func (*MockChunker) Next ¶
func (m *MockChunker) Next() (*Chunk, error)
func (*MockChunker) OpenAtWatermark ¶
func (m *MockChunker) OpenAtWatermark(watermark string) error
func (*MockChunker) Reset ¶
func (m *MockChunker) Reset() error
func (*MockChunker) SetChunkSize ¶
func (m *MockChunker) SetChunkSize(size uint64)
func (*MockChunker) SetCloseError ¶
func (m *MockChunker) SetCloseError(err error)
func (*MockChunker) SetNextError ¶
func (m *MockChunker) SetNextError(err error)
func (*MockChunker) SetOpenError ¶
func (m *MockChunker) SetOpenError(err error)
Configuration methods
func (*MockChunker) SetWatermarkError ¶
func (m *MockChunker) SetWatermarkError(err error)
func (*MockChunker) SimulateProgress ¶
func (m *MockChunker) SimulateProgress(percentage float64)
Test helper methods
func (*MockChunker) Tables ¶
func (m *MockChunker) Tables() []*TableInfo
type TableInfo ¶
type TableInfo struct {
sync.Mutex
EstimatedRows uint64 // used by the composite chunker for Max
SchemaName string
TableName string
QuotedName string
Columns []string // all the column names
NonGeneratedColumns []string // all the non-generated column names
Indexes []string // all the index names
KeyColumns []string // the column names of the primaryKey
KeyIsAutoInc bool // if pk[0] is an auto_increment column
DisableAutoUpdateStatistics atomic.Bool
// contains filtered or unexported fields
}
func (*TableInfo) AutoUpdateStatistics ¶
func (t *TableInfo) AutoUpdateStatistics(ctx context.Context, interval time.Duration, logger loggers.Advanced)
AutoUpdateStatistics runs a loop that updates the table statistics every interval. This will continue until Close() is called on the tableInfo, or t.DisableAutoUpdateStatistics is set to true.
func (*TableInfo) GetColumnMySQLType ¶
GetColumnMySQLType returns the MySQL type for a given column name
func (*TableInfo) PrimaryKeyIsMemoryComparable ¶
PrimaryKeyIsMemoryComparable checks that the PRIMARY KEY type is compatible. We no longer need this check for the chunker, since it can handle any type of key in the composite chunker. But the migration still needs to verify this, because of the delta map feature, which requires binary comparable keys.
func (*TableInfo) PrimaryKeyValues ¶
PrimaryKeyValues helps extract the PRIMARY KEY from a row image. It uses our knowledge of the ordinal position of columns to find the position of primary key columns (there might be more than one). For minimal row image, you need to send the before image to extract the PK. This is because in the after image, the PK might be nil.
func (*TableInfo) SetInfo ¶
SetInfo reads from MySQL metadata (usually infoschema) and sets the values in TableInfo.