Documentation
¶
Index ¶
- func BuildIncrementalQuery(ctx context.Context, opts DriverOptions) (string, []any, error)
- func GetMaxCursorValues(ctx context.Context, client *sqlx.DB, driverType constants.DriverType, ...) (any, any, error)
- func GetPlaceholder(driver constants.DriverType) func(int) string
- func MapScan(rows *sql.Rows, dest map[string]any, ...) error
- func MinMaxQuery(stream types.StreamInterface, column string) string
- func MinMaxQueryMySQL(stream types.StreamInterface, columns []string) string
- func MySQLBinlogFormatQuery() string
- func MySQLBinlogRowMetadataQuery() string
- func MySQLDiscoverTablesQuery() string
- func MySQLLogBinQuery() string
- func MySQLMasterStatusQuery() string
- func MySQLMasterStatusQueryNew() string
- func MySQLPrimaryKeyQuery() string
- func MySQLTableColumnsQuery() string
- func MySQLTableExistsQuery(stream types.StreamInterface) string
- func MySQLTableRowStatsQuery() string
- func MySQLTableSchemaQuery() string
- func MySQLVersion(ctx context.Context, client *sqlx.DB) (string, int, int, error)
- func MysqlChunkScanQuery(stream types.StreamInterface, filterColumns []string, chunk types.Chunk, ...) string
- func MysqlLimitOffsetScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string
- func NextChunkEndQuery(stream types.StreamInterface, columns []string, chunkSize int64) string
- func OracleBlockSizeQuery() string
- func OracleChunkCreationQuery(stream types.StreamInterface, blocksPerChunk int64, taskName string) string
- func OracleChunkRetrievalQuery(taskName string) string
- func OracleChunkScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string
- func OracleChunkTaskCleanerQuery(taskName string) string
- func OracleEmptyCheckQuery(stream types.StreamInterface) string
- func OracleIncrementalValueFormatter(ctx context.Context, cursorField, argumentPlaceholder string, isBackfill bool, ...) (string, any, error)
- func OraclePrimaryKeyColummsQuery(schemaName, tableName string) string
- func OracleTableDetailsQuery(schemaName, tableName string) string
- func OracleTableDiscoveryQuery() string
- func OracleTableRowStatsQuery() string
- func OracleTaskCreationQuery(taskName string) string
- func PostgresBlockSizeQuery() string
- func PostgresChunkScanQuery(stream types.StreamInterface, filterColumn string, chunk types.Chunk, ...) string
- func PostgresIsPartitionedQuery(stream types.StreamInterface) string
- func PostgresNextChunkEndQuery(stream types.StreamInterface, filterColumn string, filterValue interface{}) string
- func PostgresPartitionPages(stream types.StreamInterface) string
- func PostgresRelPageCount(stream types.StreamInterface) string
- func PostgresRowCountQuery(stream types.StreamInterface) string
- func PostgresWalLSNQuery() string
- func QuoteColumns(columns []string, driver constants.DriverType) []string
- func QuoteIdentifier(identifier string, driver constants.DriverType) string
- func QuoteTable(schema, table string, driver constants.DriverType) string
- func SQLFilter(stream types.StreamInterface, driver string, thresholdFilter string) (string, error)
- func ThresholdFilter(ctx context.Context, opts DriverOptions) (string, []any, error)
- func WithIsolation(ctx context.Context, client *sqlx.DB, fn func(tx *sql.Tx) error) error
- type DriverOptions
- type Reader
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildIncrementalQuery ¶ added in v0.1.9
BuildIncrementalQuery generates the incremental query SQL based on driver type
func GetMaxCursorValues ¶ added in v0.2.7
func GetPlaceholder ¶ added in v0.2.7
func GetPlaceholder(driver constants.DriverType) func(int) string
GetPlaceholder returns the appropriate placeholder for the given driver
func MinMaxQuery ¶
func MinMaxQuery(stream types.StreamInterface, column string) string
MinMaxQuery returns the query to fetch MIN and MAX values of a column in a Postgres table
func MinMaxQueryMySQL ¶
func MinMaxQueryMySQL(stream types.StreamInterface, columns []string) string
MinMaxQueryMySQL returns the query to fetch MIN and MAX values of a column in a MySQL table
func MySQLBinlogFormatQuery ¶ added in v0.1.7
func MySQLBinlogFormatQuery() string
MySQLBinlogFormatQuery returns the query to fetch the binlog_format variable in MySQL
func MySQLBinlogRowMetadataQuery ¶ added in v0.1.7
func MySQLBinlogRowMetadataQuery() string
MySQLBinlogRowMetadataQuery returns the query to fetch the binlog_row_metadata variable in MySQL
func MySQLDiscoverTablesQuery ¶
func MySQLDiscoverTablesQuery() string
MySQLDiscoverTablesQuery returns the query to discover tables in a MySQL database
func MySQLLogBinQuery ¶ added in v0.1.7
func MySQLLogBinQuery() string
MySQLLogBinQuery returns the query to fetch the log_bin variable in MySQL
func MySQLMasterStatusQuery ¶
func MySQLMasterStatusQuery() string
MySQLMasterStatusQuery returns the query to fetch the current binlog position in MySQL: mysql v8.3 and below
func MySQLMasterStatusQueryNew ¶
func MySQLMasterStatusQueryNew() string
MySQLMasterStatusQuery returns the query to fetch the current binlog position in MySQL: mysql v8.4 and above
func MySQLPrimaryKeyQuery ¶
func MySQLPrimaryKeyQuery() string
MySQLPrimaryKeyQuery returns the query to fetch the primary key column of a table in MySQL
func MySQLTableColumnsQuery ¶
func MySQLTableColumnsQuery() string
MySQLTableColumnsQuery returns the query to fetch column names of a table in MySQL
func MySQLTableExistsQuery ¶ added in v0.2.4
func MySQLTableExistsQuery(stream types.StreamInterface) string
MySQLTableExistsQuery returns the query to check if a table has any rows using EXISTS
func MySQLTableRowStatsQuery ¶ added in v0.1.9
func MySQLTableRowStatsQuery() string
MySQLTableRowStatsQuery returns the query to fetch the estimated row count and average row size of a table in MySQL
func MySQLTableSchemaQuery ¶
func MySQLTableSchemaQuery() string
MySQLTableSchemaQuery returns the query to fetch schema information for a table in MySQL
func MySQLVersion ¶
MySQLVersion returns the version of the MySQL server It returns the flavor, major and minor version of the MySQL server
func MysqlChunkScanQuery ¶
func MysqlChunkScanQuery(stream types.StreamInterface, filterColumns []string, chunk types.Chunk, extraFilter string) string
MySQLWithoutState builds a chunk scan query for MySql
func MysqlLimitOffsetScanQuery ¶
func MysqlLimitOffsetScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string
MysqlLimitOffsetScanQuery is used to get the rows
func NextChunkEndQuery ¶
func NextChunkEndQuery(stream types.StreamInterface, columns []string, chunkSize int64) string
NextChunkEndQuery returns the query to calculate the next chunk boundary Example: Input:
stream.Namespace() = "mydb"
stream.Name() = "users"
columns = []string{"id", "created_at"}
chunkSize = 1000
Output:
SELECT CONCAT_WS(',', id, created_at) AS key_str FROM (
SELECT (',', id, created_at)
FROM `mydb`.`users`
WHERE (`id` > ?) OR (`id` = ? AND `created_at` > ?)
ORDER BY id, created_at
LIMIT 1 OFFSET 1000
) AS subquery
func OracleBlockSizeQuery ¶ added in v0.1.6
func OracleBlockSizeQuery() string
OracleTableSizeQuery returns the query to fetch the size of a table in bytes in OracleDB
func OracleChunkCreationQuery ¶ added in v0.1.6
func OracleChunkCreationQuery(stream types.StreamInterface, blocksPerChunk int64, taskName string) string
OracleChunkCreationQuery returns the query to make chunks in OracleDB using DBMS_PARALLEL_EXECUTE
func OracleChunkRetrievalQuery ¶ added in v0.1.6
OracleChunkRetrievalQuery returns the query to retrieve chunks from DBMS_PARALLEL_EXECUTE in OracleDB
func OracleChunkScanQuery ¶ added in v0.1.3
OracleChunkScanQuery returns the query to fetch the rows of a table in OracleDB
func OracleChunkTaskCleanerQuery ¶ added in v0.1.6
OracleChunkTaskCleanerQuery returns the query to clean up a chunk task in OracleDB
func OracleEmptyCheckQuery ¶ added in v0.1.3
func OracleEmptyCheckQuery(stream types.StreamInterface) string
OracleEmptyCheckQuery returns the query to check if a table is empty in OracleDB
func OracleIncrementalValueFormatter ¶ added in v0.1.9
func OracleIncrementalValueFormatter(ctx context.Context, cursorField, argumentPlaceholder string, isBackfill bool, lastCursorValue any, opts DriverOptions) (string, any, error)
OracleIncrementalValueFormatter is used to format the value of the cursor field for Oracle incremental sync, mainly because of the various timestamp formats
func OraclePrimaryKeyColummsQuery ¶ added in v0.1.3
OraclePrimaryKeyQuery returns the query to fetch all the primary key columns of a table in OracleDB
func OracleTableDetailsQuery ¶ added in v0.1.3
OracleTableDetailsQuery returns the query to fetch the details of a table in OracleDB
func OracleTableDiscoveryQuery ¶ added in v0.1.6
func OracleTableDiscoveryQuery() string
OracleTableDiscoveryQuery returns the query to fetch the username and table name of all the tables which the current user has access to in OracleDB
func OracleTableRowStatsQuery ¶ added in v0.2.9
func OracleTableRowStatsQuery() string
OracleTableRowStatsQuery returns the query to fetch the estimated row count of a table in Oracle
func OracleTaskCreationQuery ¶ added in v0.1.6
OracleTaskCreationQuery returns the query to create a task in OracleDB
func PostgresBlockSizeQuery ¶ added in v0.1.9
func PostgresBlockSizeQuery() string
PostgresBlockSizeQuery returns the query to fetch the block size in PostgreSQL
func PostgresChunkScanQuery ¶
func PostgresChunkScanQuery(stream types.StreamInterface, filterColumn string, chunk types.Chunk, filter string) string
PostgresBuildSplitScanQuery builds a chunk scan query for PostgreSQL
func PostgresIsPartitionedQuery ¶ added in v0.2.9
func PostgresIsPartitionedQuery(stream types.StreamInterface) string
PostgresIsPartitionedQuery returns a SQL query that checks whether a table is partitioned. It counts how many partitions exist under the given parent table in the specified schema.
func PostgresNextChunkEndQuery ¶
func PostgresNextChunkEndQuery(stream types.StreamInterface, filterColumn string, filterValue interface{}) string
PostgresNextChunkEndQuery generates a SQL query to fetch the maximum value of a specified column
func PostgresPartitionPages ¶ added in v0.2.9
func PostgresPartitionPages(stream types.StreamInterface) string
PostgresPartitionPages returns total relpages for each partition and the parent table. This can be used to dynamically adjust chunk sizes based on partition distribution.
func PostgresRelPageCount ¶
func PostgresRelPageCount(stream types.StreamInterface) string
PostgresRelPageCount returns the query to fetch relation page count in PostgreSQL
func PostgresRowCountQuery ¶
func PostgresRowCountQuery(stream types.StreamInterface) string
PostgreSQL-Specific Queries TODO: Rewrite queries for taking vars as arguments while execution. PostgresRowCountQuery returns the query to fetch the estimated row count in PostgreSQL
func PostgresWalLSNQuery ¶
func PostgresWalLSNQuery() string
PostgresWalLSNQuery returns the query to fetch the current WAL LSN in PostgreSQL
func QuoteColumns ¶ added in v0.2.0
func QuoteColumns(columns []string, driver constants.DriverType) []string
QuoteColumns returns a slice of quoted column names
func QuoteIdentifier ¶ added in v0.2.0
func QuoteIdentifier(identifier string, driver constants.DriverType) string
QuoteIdentifier returns the properly quoted identifier based on database driver
func QuoteTable ¶ added in v0.2.0
func QuoteTable(schema, table string, driver constants.DriverType) string
QuoteTable returns the properly quoted schema.table combination
func SQLFilter ¶ added in v0.1.6
ParseFilter converts a filter string to a valid SQL WHERE condition, also appends the threshold filter if present
func ThresholdFilter ¶ added in v0.2.7
ThresholdFilter is used to update the filter for initial run of incremental sync during backfill. This is to avoid dupliction of records, as max cursor value is fetched before the chunk creation.
Types ¶
type DriverOptions ¶ added in v0.2.7
type DriverOptions struct {
Driver constants.DriverType
Stream types.StreamInterface
State *types.State
Client *sqlx.DB
}
DriverOptions contains options for creating various queries