Documentation
¶
Index ¶
- func BuildIncrementalQuery(ctx context.Context, opts DriverOptions) (string, []any, error)
- func DB2ApproxRowCountQuery(stream types.StreamInterface) string
- func DB2AvgRowSizeQuery(stream types.StreamInterface) string
- func DB2DiscoveryQuery() string
- func DB2MinMaxPKQuery(stream types.StreamInterface, columns []string) string
- func DB2MinMaxRidQuery(stream types.StreamInterface) string
- func DB2NextChunkEndQuery(stream types.StreamInterface, columns []string, chunkSize int64) string
- func DB2PKChunkScanQuery(stream types.StreamInterface, filterColumns []string, chunk types.Chunk, ...) string
- func DB2PageStatsQuery(stream types.StreamInterface) string
- func DB2RidChunkScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string
- func DB2TableSchemaAndPrimaryKeysQuery() string
- func DB2TableStatsExistQuery(stream types.StreamInterface) string
- func GetMaxCursorValues(ctx context.Context, client *sqlx.DB, driverType constants.DriverType, ...) (any, any, error)
- func GetPlaceholder(driver constants.DriverType) func(int) string
- func IncrementalValueFormatter(ctx context.Context, cursorField, argumentPlaceholder string, isBackfill bool, ...) (string, any, error)
- func MSSQLCDCAdvanceLSNQuery() string
- func MSSQLCDCDiscoverQuery(streamID string) string
- func MSSQLCDCGetChangesQuery(captureInstance string) string
- func MSSQLCDCMaxLSNQuery() string
- func MSSQLCDCSupportQuery() string
- func MSSQLCDCTableEnabledQuery() string
- func MSSQLChunkScanQuery(stream types.StreamInterface, filterColumns []string, chunk types.Chunk, ...) string
- func MSSQLDiscoverTablesQuery() string
- func MSSQLNextChunkEndQuery(stream types.StreamInterface, orderingColumns []string, chunkSize int64) string
- func MSSQLPhysLocChunkScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string
- func MSSQLPhysLocExtremesQuery(stream types.StreamInterface) string
- func MSSQLPhysLocNextChunkEndQuery(stream types.StreamInterface, chunkSize int64) string
- func MSSQLTableExistsQuery(stream types.StreamInterface) string
- func MSSQLTableRowStatsQuery() string
- func MSSQLTableSchemaQuery() string
- func MapScan(rows *sql.Rows, dest map[string]any, ...) error
- func MapScanConcurrent(setter *Reader[*sql.Rows], ...) error
- func MinMaxQuery(stream types.StreamInterface, column string) string
- func MinMaxQueryMSSQL(stream types.StreamInterface, columns []string) string
- func MinMaxQueryMySQL(stream types.StreamInterface, columns []string) string
- func MySQLBinlogFormatQuery() string
- func MySQLBinlogRowMetadataQuery() string
- func MySQLDiscoverTablesQuery() string
- func MySQLLogBinQuery() string
- func MySQLMasterStatusQuery() string
- func MySQLMasterStatusQueryNew() string
- func MySQLPrimaryKeyQuery() string
- func MySQLTableColumnsQuery() string
- func MySQLTableExistsQuery(stream types.StreamInterface) string
- func MySQLTableRowStatsQuery() string
- func MySQLTableSchemaQuery() string
- func MySQLTimeZoneQuery() string
- func MySQLVersion(ctx context.Context, client *sqlx.DB) (string, int, int, error)
- func MysqlChunkScanQuery(stream types.StreamInterface, filterColumns []string, chunk types.Chunk, ...) string
- func MysqlLimitOffsetScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string
- func NextChunkEndQuery(stream types.StreamInterface, columns []string, chunkSize int64) string
- func OracleBlockSizeQuery() string
- func OracleChunkCreationQuery(stream types.StreamInterface, blocksPerChunk int64, taskName string) string
- func OracleChunkRetrievalQuery(taskName string) string
- func OracleChunkScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string
- func OracleChunkTaskCleanerQuery(taskName string) string
- func OracleEmptyCheckQuery(stream types.StreamInterface) string
- func OraclePrimaryKeyColummsQuery(schemaName, tableName string) string
- func OracleTableDetailsQuery(schemaName, tableName string) string
- func OracleTableDiscoveryQuery() string
- func OracleTableRowStatsQuery() string
- func OracleTaskCreationQuery(taskName string) string
- func PostgresBlockSizeQuery() string
- func PostgresChunkScanQuery(stream types.StreamInterface, filterColumn string, chunk types.Chunk, ...) string
- func PostgresIsPartitionedQuery(stream types.StreamInterface) string
- func PostgresNextChunkEndQuery(stream types.StreamInterface, filterColumn string, filterValue interface{}) string
- func PostgresPartitionPages(stream types.StreamInterface) string
- func PostgresRelPageCount(stream types.StreamInterface) string
- func PostgresRowCountQuery(stream types.StreamInterface) string
- func PostgresWalLSNQuery() string
- func QuoteColumns(columns []string, driver constants.DriverType) []string
- func QuoteIdentifier(identifier string, driver constants.DriverType) string
- func QuoteTable(schema, table string, driver constants.DriverType) string
- func SQLFilter(stream types.StreamInterface, driver string, thresholdFilter string) (string, error)
- func ThresholdFilter(ctx context.Context, opts DriverOptions) (string, []any, error)
- func WithIsolation(ctx context.Context, client *sqlx.DB, readOnly bool, fn func(tx *sql.Tx) error) error
- type DriverOptions
- type Reader
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildIncrementalQuery ¶ added in v0.1.9
BuildIncrementalQuery generates the incremental query SQL based on driver type
func DB2ApproxRowCountQuery ¶ added in v0.3.14
func DB2ApproxRowCountQuery(stream types.StreamInterface) string
DB2ApproxRowCountQuery uses generic CARD from SYSCAT.TABLES (CARD is approx).
func DB2AvgRowSizeQuery ¶ added in v0.3.14
func DB2AvgRowSizeQuery(stream types.StreamInterface) string
DB2AvgRowSizeQuery returns the query to fetch the average row size for a table in DB2
func DB2DiscoveryQuery ¶ added in v0.3.14
func DB2DiscoveryQuery() string
DB2DiscoveryQuery returns the query to discover tables in a DB2 database with filter for 'T' (Table) and 'V' (View).
func DB2MinMaxPKQuery ¶ added in v0.3.14
func DB2MinMaxPKQuery(stream types.StreamInterface, columns []string) string
DB2MinMaxPKQuery returns the query to fetch min/max values of a column
func DB2MinMaxRidQuery ¶ added in v0.3.14
func DB2MinMaxRidQuery(stream types.StreamInterface) string
DB2MinMaxRidQuery to find the min/max of RIDs for chunking
func DB2NextChunkEndQuery ¶ added in v0.3.14
func DB2NextChunkEndQuery(stream types.StreamInterface, columns []string, chunkSize int64) string
DB2NextChunkEndQuery returns the query to calculate the next chunk boundary for DB2
func DB2PKChunkScanQuery ¶ added in v0.3.14
func DB2PKChunkScanQuery(stream types.StreamInterface, filterColumns []string, chunk types.Chunk, extraFilter string) string
DB2PKChunkScanQuery builds a chunk scan query for DB2 using Primary Keys
func DB2PageStatsQuery ¶ added in v0.3.14
func DB2PageStatsQuery(stream types.StreamInterface) string
DB2PageStatsQuery returns the query to fetch the page size and number of pages for a table
func DB2RidChunkScanQuery ¶ added in v0.3.14
DB2RidChunkScanQuery returns the query to fetch rows for a specific chunk using RID
func DB2TableSchemaAndPrimaryKeysQuery ¶ added in v0.3.14
func DB2TableSchemaAndPrimaryKeysQuery() string
DB2TableSchemaAndPrimaryKeysQuery returns the query to fetch columns for a specific table
func DB2TableStatsExistQuery ¶ added in v0.3.14
func DB2TableStatsExistQuery(stream types.StreamInterface) string
DB2TableStatsExistQuery returns the query to check if a table exists in DB2
func GetMaxCursorValues ¶ added in v0.2.7
func GetPlaceholder ¶ added in v0.2.7
func GetPlaceholder(driver constants.DriverType) func(int) string
GetPlaceholder returns the appropriate placeholder for the given driver
func IncrementalValueFormatter ¶ added in v0.3.14
func IncrementalValueFormatter(ctx context.Context, cursorField, argumentPlaceholder string, isBackfill bool, lastCursorValue any, opts DriverOptions) (string, any, error)
IncrementalValueFormatter is used to format the value of the cursor field for incremental sync, mainly because of the various timestamp formats
func MSSQLCDCAdvanceLSNQuery ¶ added in v0.3.15
func MSSQLCDCAdvanceLSNQuery() string
MSSQLCDCAdvanceLSNQuery returns the query to increment an LSN for CDC
func MSSQLCDCDiscoverQuery ¶ added in v0.3.15
MSSQLCDCDiscoverQuery returns the query to discover CDC-enabled capture instances
func MSSQLCDCGetChangesQuery ¶ added in v0.3.15
MSSQLCDCGetChangesQuery returns the query to fetch CDC changes for a capture instance
func MSSQLCDCMaxLSNQuery ¶ added in v0.3.15
func MSSQLCDCMaxLSNQuery() string
TODO: check about `sys.fn_cdc_get_min_lsn` MSSQLCDCMaxLSNQuery returns the query to fetch the current maximum LSN for CDC
func MSSQLCDCSupportQuery ¶ added in v0.3.15
func MSSQLCDCSupportQuery() string
MSSQLCDCSupportQuery returns the query to check if CDC is enabled for the current database
func MSSQLCDCTableEnabledQuery ¶ added in v0.3.15
func MSSQLCDCTableEnabledQuery() string
MSSQLCDCTableEnabledQuery returns the query to check if CDC is enabled for a specific table
func MSSQLChunkScanQuery ¶ added in v0.3.15
func MSSQLChunkScanQuery(stream types.StreamInterface, filterColumns []string, chunk types.Chunk, filter string) string
MSSQLChunkScanQuery returns the SQL query for scanning a chunk in MSSQL
func MSSQLDiscoverTablesQuery ¶ added in v0.3.15
func MSSQLDiscoverTablesQuery() string
MSSQLDiscoverTablesQuery returns the query to discover tables in a MSSQL database
func MSSQLNextChunkEndQuery ¶ added in v0.3.15
func MSSQLNextChunkEndQuery(stream types.StreamInterface, orderingColumns []string, chunkSize int64) string
MSSQLNextChunkEndQuery returns the query to calculate the next chunk boundary.
Example:
stream.Namespace() = "dbo"
stream.Name() = "orders"
orderingColumns = []string{"order_id", "item_id"}
chunkSize = 1000
Conceptual output:
WITH ordered AS (
SELECT
<concat(order_id, item_id)> AS key_str,
ROW_NUMBER() OVER (ORDER BY [order_id], [item_id]) AS rn
FROM [dbo].[orders]
WHERE
([order_id] > @p1)
OR ([order_id] = @p1 AND [item_id] > @p2)
)
SELECT key_str FROM ordered WHERE rn = 1000;
func MSSQLPhysLocChunkScanQuery ¶ added in v0.3.15
func MSSQLPhysLocChunkScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string
MSSQLPhysLocChunkScanQuery returns the SQL query for scanning a chunk using %%physloc%% in MSSQL
func MSSQLPhysLocExtremesQuery ¶ added in v0.3.15
func MSSQLPhysLocExtremesQuery(stream types.StreamInterface) string
MSSQLPhysLocExtremesQuery returns the query to fetch MIN and MAX %%physloc%% values for a table
func MSSQLPhysLocNextChunkEndQuery ¶ added in v0.3.15
func MSSQLPhysLocNextChunkEndQuery(stream types.StreamInterface, chunkSize int64) string
MSSQLPhysLocNextChunkEndQuery returns the query to find the next %%physloc%% chunk boundary
func MSSQLTableExistsQuery ¶ added in v0.3.15
func MSSQLTableExistsQuery(stream types.StreamInterface) string
MSSQLTableExistsQuery returns the query to check if a table has any rows
func MSSQLTableRowStatsQuery ¶ added in v0.3.15
func MSSQLTableRowStatsQuery() string
MSSQLTableRowStatsQuery returns the query to fetch the estimated row count and average row size of a table in MSSQL
func MSSQLTableSchemaQuery ¶ added in v0.3.15
func MSSQLTableSchemaQuery() string
MSSQLTableSchemaQuery returns the query to fetch the column_name, data_type, nullable, and primary_key information of a table in MSSQL
func MapScan ¶
func MapScan(rows *sql.Rows, dest map[string]any, converter func(value interface{}, columnType string) (interface{}, error)) error
TODO: Use MapScanConcurrent instead of MapScan for incremental as well
func MapScanConcurrent ¶ added in v0.3.14
func MinMaxQuery ¶
func MinMaxQuery(stream types.StreamInterface, column string) string
MinMaxQuery returns the query to fetch MIN and MAX values of a column in a Postgres table
func MinMaxQueryMSSQL ¶ added in v0.3.15
func MinMaxQueryMSSQL(stream types.StreamInterface, columns []string) string
func MinMaxQueryMySQL ¶
func MinMaxQueryMySQL(stream types.StreamInterface, columns []string) string
MinMaxQueryMySQL returns the query to fetch MIN and MAX values of a column in a MySQL table
func MySQLBinlogFormatQuery ¶ added in v0.1.7
func MySQLBinlogFormatQuery() string
MySQLBinlogFormatQuery returns the query to fetch the binlog_format variable in MySQL
func MySQLBinlogRowMetadataQuery ¶ added in v0.1.7
func MySQLBinlogRowMetadataQuery() string
MySQLBinlogRowMetadataQuery returns the query to fetch the binlog_row_metadata variable in MySQL
func MySQLDiscoverTablesQuery ¶
func MySQLDiscoverTablesQuery() string
MySQLDiscoverTablesQuery returns the query to discover tables in a MySQL database
func MySQLLogBinQuery ¶ added in v0.1.7
func MySQLLogBinQuery() string
MySQLLogBinQuery returns the query to fetch the log_bin variable in MySQL
func MySQLMasterStatusQuery ¶
func MySQLMasterStatusQuery() string
MySQLMasterStatusQuery returns the query to fetch the current binlog position in MySQL: mysql v8.3 and below
func MySQLMasterStatusQueryNew ¶
func MySQLMasterStatusQueryNew() string
MySQLMasterStatusQuery returns the query to fetch the current binlog position in MySQL: mysql v8.4 and above
func MySQLPrimaryKeyQuery ¶
func MySQLPrimaryKeyQuery() string
MySQLPrimaryKeyQuery returns the query to fetch the primary key column of a table in MySQL
func MySQLTableColumnsQuery ¶
func MySQLTableColumnsQuery() string
MySQLTableColumnsQuery returns the query to fetch column names of a table in MySQL
func MySQLTableExistsQuery ¶ added in v0.2.4
func MySQLTableExistsQuery(stream types.StreamInterface) string
MySQLTableExistsQuery returns the query to check if a table has any rows using EXISTS
func MySQLTableRowStatsQuery ¶ added in v0.1.9
func MySQLTableRowStatsQuery() string
MySQLTableRowStatsQuery returns the query to fetch the estimated row count and average row size of a table in MySQL
func MySQLTableSchemaQuery ¶
func MySQLTableSchemaQuery() string
MySQLTableSchemaQuery returns the query to fetch schema information for a table in MySQL
func MySQLTimeZoneQuery ¶ added in v0.3.16
func MySQLTimeZoneQuery() string
MySQLTimeZoneQuery returns the query to fetch the timezone of the MySQL server and system timezone
func MySQLVersion ¶
MySQLVersion returns the version of the MySQL server It returns the flavor, major and minor version of the MySQL server
func MysqlChunkScanQuery ¶
func MysqlChunkScanQuery(stream types.StreamInterface, filterColumns []string, chunk types.Chunk, extraFilter string) string
MySQLWithoutState builds a chunk scan query for MySql
func MysqlLimitOffsetScanQuery ¶
func MysqlLimitOffsetScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string
MysqlLimitOffsetScanQuery is used to get the rows
func NextChunkEndQuery ¶
func NextChunkEndQuery(stream types.StreamInterface, columns []string, chunkSize int64) string
NextChunkEndQuery returns the query to calculate the next chunk boundary Example: Input:
stream.Namespace() = "mydb"
stream.Name() = "users"
columns = []string{"id", "created_at"}
chunkSize = 1000
Output:
SELECT CONCAT_WS(',', id, created_at) AS key_str FROM (
SELECT (id, created_at)
FROM `mydb`.`users`
WHERE (`id` > ?) OR (`id` = ? AND `created_at` > ?)
ORDER BY id, created_at
LIMIT 1 OFFSET 1000
) AS subquery
func OracleBlockSizeQuery ¶ added in v0.1.6
func OracleBlockSizeQuery() string
OracleTableSizeQuery returns the query to fetch the size of a table in bytes in OracleDB
func OracleChunkCreationQuery ¶ added in v0.1.6
func OracleChunkCreationQuery(stream types.StreamInterface, blocksPerChunk int64, taskName string) string
OracleChunkCreationQuery returns the query to make chunks in OracleDB using DBMS_PARALLEL_EXECUTE
func OracleChunkRetrievalQuery ¶ added in v0.1.6
OracleChunkRetrievalQuery returns the query to retrieve chunks from DBMS_PARALLEL_EXECUTE in OracleDB
func OracleChunkScanQuery ¶ added in v0.1.3
OracleChunkScanQuery returns the query to fetch the rows of a table in OracleDB
func OracleChunkTaskCleanerQuery ¶ added in v0.1.6
OracleChunkTaskCleanerQuery returns the query to clean up a chunk task in OracleDB
func OracleEmptyCheckQuery ¶ added in v0.1.3
func OracleEmptyCheckQuery(stream types.StreamInterface) string
OracleEmptyCheckQuery returns the query to check if a table is empty in OracleDB
func OraclePrimaryKeyColummsQuery ¶ added in v0.1.3
OraclePrimaryKeyQuery returns the query to fetch all the primary key columns of a table in OracleDB
func OracleTableDetailsQuery ¶ added in v0.1.3
OracleTableDetailsQuery returns the query to fetch the details of a table in OracleDB
func OracleTableDiscoveryQuery ¶ added in v0.1.6
func OracleTableDiscoveryQuery() string
OracleTableDiscoveryQuery returns the query to fetch the username and table name of all the tables which the current user has access to in OracleDB
func OracleTableRowStatsQuery ¶ added in v0.2.9
func OracleTableRowStatsQuery() string
OracleTableRowStatsQuery returns the query to fetch the estimated row count of a table in Oracle
func OracleTaskCreationQuery ¶ added in v0.1.6
OracleTaskCreationQuery returns the query to create a task in OracleDB
func PostgresBlockSizeQuery ¶ added in v0.1.9
func PostgresBlockSizeQuery() string
PostgresBlockSizeQuery returns the query to fetch the block size in PostgreSQL
func PostgresChunkScanQuery ¶
func PostgresChunkScanQuery(stream types.StreamInterface, filterColumn string, chunk types.Chunk, filter string) string
PostgresBuildSplitScanQuery builds a chunk scan query for PostgreSQL
func PostgresIsPartitionedQuery ¶ added in v0.2.9
func PostgresIsPartitionedQuery(stream types.StreamInterface) string
PostgresIsPartitionedQuery returns a SQL query that checks whether a table is partitioned. It counts how many partitions exist under the given parent table in the specified schema.
func PostgresNextChunkEndQuery ¶
func PostgresNextChunkEndQuery(stream types.StreamInterface, filterColumn string, filterValue interface{}) string
PostgresNextChunkEndQuery generates a SQL query to fetch the maximum value of a specified column
func PostgresPartitionPages ¶ added in v0.2.9
func PostgresPartitionPages(stream types.StreamInterface) string
PostgresPartitionPages returns total relpages for each partition and the parent table. This can be used to dynamically adjust chunk sizes based on partition distribution.
func PostgresRelPageCount ¶
func PostgresRelPageCount(stream types.StreamInterface) string
PostgresRelPageCount returns the query to fetch relation page count in PostgreSQL
func PostgresRowCountQuery ¶
func PostgresRowCountQuery(stream types.StreamInterface) string
PostgreSQL-Specific Queries TODO: Rewrite queries for taking vars as arguments while execution. PostgresRowCountQuery returns the query to fetch the estimated row count in PostgreSQL
func PostgresWalLSNQuery ¶
func PostgresWalLSNQuery() string
PostgresWalLSNQuery returns the query to fetch the current WAL LSN in PostgreSQL
func QuoteColumns ¶ added in v0.2.0
func QuoteColumns(columns []string, driver constants.DriverType) []string
QuoteColumns returns a slice of quoted column names
func QuoteIdentifier ¶ added in v0.2.0
func QuoteIdentifier(identifier string, driver constants.DriverType) string
QuoteIdentifier returns the properly quoted identifier based on database driver
func QuoteTable ¶ added in v0.2.0
func QuoteTable(schema, table string, driver constants.DriverType) string
QuoteTable returns the properly quoted schema.table combination
func SQLFilter ¶ added in v0.1.6
ParseFilter converts a filter string to a valid SQL WHERE condition, also appends the threshold filter if present
func ThresholdFilter ¶ added in v0.2.7
ThresholdFilter is used to update the filter for initial run of incremental sync during backfill. This is to avoid dupliction of records, as max cursor value is fetched before the chunk creation.
Types ¶
type DriverOptions ¶ added in v0.2.7
type DriverOptions struct {
Driver constants.DriverType
Stream types.StreamInterface
State *types.State
Client *sqlx.DB
}
DriverOptions contains options for creating various queries