Documentation
¶
Index ¶
- func BuildIncrementalQuery(ctx context.Context, opts DriverOptions) (string, []any, error)
- func DB2ApproxRowCountQuery(stream types.StreamInterface) string
- func DB2AvgRowSizeQuery(stream types.StreamInterface) string
- func DB2DiscoveryQuery() string
- func DB2MinMaxPKQuery(stream types.StreamInterface, columns []string) string
- func DB2MinMaxRidQuery(stream types.StreamInterface) string
- func DB2NextChunkEndQuery(stream types.StreamInterface, columns []string, chunkSize int64) string
- func DB2PKChunkScanQuery(stream types.StreamInterface, filterColumns []string, chunk types.Chunk, ...) string
- func DB2PageStatsQuery(stream types.StreamInterface) string
- func DB2RidChunkScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string
- func DB2TableSchemaAndPrimaryKeysQuery() string
- func DB2TableStatsExistQuery(stream types.StreamInterface) string
- func GetMaxCursorValues(ctx context.Context, client *sqlx.DB, driverType constants.DriverType, ...) (any, any, error)
- func GetPlaceholder(driver constants.DriverType) func(int) string
- func IncrementalValueFormatter(ctx context.Context, cursorField, argumentPlaceholder string, isBackfill bool, ...) (string, any, error)
- func MSSQLCDCAdvanceLSNQuery() string
- func MSSQLCDCCaptureJobConfigQuery() string
- func MSSQLCDCCreateCaptureInstanceQuery() string
- func MSSQLCDCDisableCaptureInstanceQuery() string
- func MSSQLCDCDiscoverQuery(streamIDs []string) string
- func MSSQLCDCGetChangesQuery(captureInstance string) string
- func MSSQLCDCGetDDLHistoryBulkQuery(streamIDs []string) string
- func MSSQLCDCLatestScanSessionQuery() string
- func MSSQLCDCMaxLSNQuery() string
- func MSSQLCDCSupportQuery() string
- func MSSQLCDCTableEnabledQuery() string
- func MSSQLChunkScanQuery(stream types.StreamInterface, filterColumns []string, chunk types.Chunk, ...) string
- func MSSQLColumnTypeQuery() string
- func MSSQLDiscoverTablesQuery() string
- func MSSQLNextChunkEndQuery(stream types.StreamInterface, orderingColumns []string, chunkSize int64) string
- func MSSQLPhysLocChunkScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string
- func MSSQLPhysLocExtremesQuery(stream types.StreamInterface) string
- func MSSQLPhysLocNextChunkEndQuery(stream types.StreamInterface, chunkSize int64) string
- func MSSQLTableExistsQuery(stream types.StreamInterface) string
- func MSSQLTableRowStatsQuery() string
- func MSSQLTableSchemaQuery() string
- func MSSQLViewDatabaseStatePermissionQuery() string
- func MapScan(rows *sql.Rows, dest map[string]any, ...) error
- func MapScanConcurrent(setter *Reader[*sql.Rows], ...) error
- func MinMaxQuery(stream types.StreamInterface, column string) string
- func MinMaxQueryMSSQL(stream types.StreamInterface, columns []string) string
- func MinMaxQueryMySQL(stream types.StreamInterface, columns []string) string
- func MySQLBinlogFormatQuery() string
- func MySQLBinlogRowMetadataQuery() string
- func MySQLDiscoverTablesQuery() string
- func MySQLLogBinQuery() string
- func MySQLMasterStatusQuery() string
- func MySQLMasterStatusQueryNew() string
- func MySQLPrimaryKeyQuery() string
- func MySQLTableColumnsQuery() string
- func MySQLTableExistsQuery(stream types.StreamInterface) string
- func MySQLTableRowStatsQuery() string
- func MySQLTableSchemaQuery() string
- func MySQLTimeZoneQuery() string
- func MySQLVersion(ctx context.Context, client *sqlx.DB) (string, int, int, error)
- func MysqlChunkScanQuery(stream types.StreamInterface, filterColumns []string, chunk types.Chunk, ...) string
- func MysqlLimitOffsetScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string
- func NextChunkEndQuery(stream types.StreamInterface, columns []string, chunkSize int64) string
- func NextRowIDQuery(stream types.StreamInterface, ROWID string, chunkSize int64) string
- func OracleBlockSizeQuery() string
- func OracleChunkCreationQuery(stream types.StreamInterface, blocksPerChunk int64, taskName string) string
- func OracleChunkRetrievalQuery(taskName string) string
- func OracleChunkScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) (string, error)
- func OracleChunkTaskCleanerQuery(taskName string) string
- func OracleColumnDataTypeQuery(schemaName, tableName, columnName string) string
- func OracleEmptyCheckQuery(stream types.StreamInterface) string
- func OracleMinMaxRowIDQuery(stream types.StreamInterface) string
- func OraclePrimaryKeyColummsQuery(schemaName, tableName string) string
- func OracleTableDetailsQuery(schemaName, tableName string) string
- func OracleTableDiscoveryQuery() string
- func OracleTableRowStatsQuery() string
- func OracleTaskCreationQuery(taskName string) string
- func PostgresBlockSizeQuery() string
- func PostgresChunkScanQuery(stream types.StreamInterface, filterColumn string, chunk types.Chunk, ...) string
- func PostgresIsPartitionedQuery(stream types.StreamInterface) string
- func PostgresNextChunkEndQuery(stream types.StreamInterface, filterColumn string, filterValue interface{}) string
- func PostgresPartitionPages(stream types.StreamInterface) string
- func PostgresPartitionPagesPG12(stream types.StreamInterface) string
- func PostgresRelPageCount(stream types.StreamInterface) string
- func PostgresRowCountQuery(stream types.StreamInterface) string
- func PostgresServerVersionNum() string
- func PostgresWalLSNQuery() string
- func QuoteColumns(columns []string, driver constants.DriverType) []string
- func QuoteIdentifier(identifier string, driver constants.DriverType) string
- func QuoteTable(schema, table string, driver constants.DriverType) string
- func SQLFilter(stream types.StreamInterface, driver string, thresholdFilter string) (string, error)
- func ThresholdFilter(ctx context.Context, opts DriverOptions) (string, []any, error)
- func WithIsolation(ctx context.Context, client *sqlx.DB, readOnly bool, fn func(tx *sql.Tx) error) error
- type DriverOptions
- type Reader
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildIncrementalQuery ¶ added in v0.1.9
BuildIncrementalQuery generates the incremental query SQL based on driver type
func DB2ApproxRowCountQuery ¶ added in v0.3.14
func DB2ApproxRowCountQuery(stream types.StreamInterface) string
DB2ApproxRowCountQuery uses generic CARD from SYSCAT.TABLES (CARD is approx).
func DB2AvgRowSizeQuery ¶ added in v0.3.14
func DB2AvgRowSizeQuery(stream types.StreamInterface) string
DB2AvgRowSizeQuery returns the query to fetch the average row size for a table in DB2
func DB2DiscoveryQuery ¶ added in v0.3.14
func DB2DiscoveryQuery() string
DB2DiscoveryQuery returns the query to discover tables in a DB2 database with filter for 'T' (Table) and 'V' (View).
func DB2MinMaxPKQuery ¶ added in v0.3.14
func DB2MinMaxPKQuery(stream types.StreamInterface, columns []string) string
DB2MinMaxPKQuery returns the query to fetch min/max values of a column
func DB2MinMaxRidQuery ¶ added in v0.3.14
func DB2MinMaxRidQuery(stream types.StreamInterface) string
DB2MinMaxRidQuery to find the min/max of RIDs for chunking
func DB2NextChunkEndQuery ¶ added in v0.3.14
func DB2NextChunkEndQuery(stream types.StreamInterface, columns []string, chunkSize int64) string
DB2NextChunkEndQuery returns the query to calculate the next chunk boundary for DB2
func DB2PKChunkScanQuery ¶ added in v0.3.14
func DB2PKChunkScanQuery(stream types.StreamInterface, filterColumns []string, chunk types.Chunk, extraFilter string) string
DB2PKChunkScanQuery builds a chunk scan query for DB2 using Primary Keys
func DB2PageStatsQuery ¶ added in v0.3.14
func DB2PageStatsQuery(stream types.StreamInterface) string
DB2PageStatsQuery returns the query to fetch the page size and number of pages for a table
func DB2RidChunkScanQuery ¶ added in v0.3.14
DB2RidChunkScanQuery returns the query to fetch rows for a specific chunk using RID
func DB2TableSchemaAndPrimaryKeysQuery ¶ added in v0.3.14
func DB2TableSchemaAndPrimaryKeysQuery() string
DB2TableSchemaAndPrimaryKeysQuery returns the query to fetch columns for a specific table
func DB2TableStatsExistQuery ¶ added in v0.3.14
func DB2TableStatsExistQuery(stream types.StreamInterface) string
DB2TableStatsExistQuery returns the query to check if a table exists in DB2
func GetMaxCursorValues ¶ added in v0.2.7
func GetPlaceholder ¶ added in v0.2.7
func GetPlaceholder(driver constants.DriverType) func(int) string
GetPlaceholder returns the appropriate placeholder for the given driver
func IncrementalValueFormatter ¶ added in v0.3.14
func IncrementalValueFormatter(ctx context.Context, cursorField, argumentPlaceholder string, isBackfill bool, lastCursorValue any, opts DriverOptions) (string, any, error)
IncrementalValueFormatter is used to format the value of the cursor field for incremental sync, mainly because of the various timestamp formats
func MSSQLCDCAdvanceLSNQuery ¶ added in v0.3.15
func MSSQLCDCAdvanceLSNQuery() string
MSSQLCDCAdvanceLSNQuery returns the query to increment an LSN for CDC
func MSSQLCDCCaptureJobConfigQuery ¶ added in v0.5.1
func MSSQLCDCCaptureJobConfigQuery() string
MSSQLCDCCaptureJobConfigQuery returns maxtrans and pollinginterval settings for the CDC capture job.
func MSSQLCDCCreateCaptureInstanceQuery ¶ added in v0.4.2
func MSSQLCDCCreateCaptureInstanceQuery() string
MSSQLCDCCreateCaptureInstanceQuery returns the query to create a new CDC capture instance
func MSSQLCDCDisableCaptureInstanceQuery ¶ added in v0.4.2
func MSSQLCDCDisableCaptureInstanceQuery() string
MSSQLCDCDisableCaptureInstanceQuery returns the query to disable a CDC capture instance
func MSSQLCDCDiscoverQuery ¶ added in v0.3.15
MSSQLCDCDiscoverQuery returns the query to discover CDC-enabled capture instances
func MSSQLCDCGetChangesQuery ¶ added in v0.3.15
MSSQLCDCGetChangesQuery returns the query to fetch CDC changes for a capture instance
func MSSQLCDCGetDDLHistoryBulkQuery ¶ added in v0.4.2
MSSQLCDCGetDDLHistoryBulkQuery returns the query to fetch DDL history events for multiple tables
func MSSQLCDCLatestScanSessionQuery ¶ added in v0.5.1
func MSSQLCDCLatestScanSessionQuery() string
MSSQLCDCLatestScanSessionQuery returns the latest completed CDC log scan session. Requires VIEW DATABASE STATE permission.
func MSSQLCDCMaxLSNQuery ¶ added in v0.3.15
func MSSQLCDCMaxLSNQuery() string
TODO: check about `sys.fn_cdc_get_min_lsn` MSSQLCDCMaxLSNQuery returns the query to fetch the current maximum LSN for CDC
func MSSQLCDCSupportQuery ¶ added in v0.3.15
func MSSQLCDCSupportQuery() string
MSSQLCDCSupportQuery returns the query to check if CDC is enabled for the current database
func MSSQLCDCTableEnabledQuery ¶ added in v0.3.15
func MSSQLCDCTableEnabledQuery() string
MSSQLCDCTableEnabledQuery returns the query to check if CDC is enabled for a specific table
func MSSQLChunkScanQuery ¶ added in v0.3.15
func MSSQLChunkScanQuery(stream types.StreamInterface, filterColumns []string, chunk types.Chunk, filter string) string
MSSQLChunkScanQuery returns the SQL query for scanning a chunk in MSSQL
func MSSQLColumnTypeQuery ¶ added in v0.4.2
func MSSQLColumnTypeQuery() string
MSSQLColumnTypeQuery returns data type query for a single table column.
func MSSQLDiscoverTablesQuery ¶ added in v0.3.15
func MSSQLDiscoverTablesQuery() string
MSSQLDiscoverTablesQuery returns the query to discover tables in a MSSQL database
func MSSQLNextChunkEndQuery ¶ added in v0.3.15
func MSSQLNextChunkEndQuery(stream types.StreamInterface, orderingColumns []string, chunkSize int64) string
MSSQLNextChunkEndQuery returns the query to calculate the next chunk boundary.
Example:
stream.Namespace() = "dbo"
stream.Name() = "orders"
orderingColumns = []string{"order_id", "item_id"}
chunkSize = 1000
Conceptual output:
WITH ordered AS (
SELECT
<concat(order_id, item_id)> AS key_str,
ROW_NUMBER() OVER (ORDER BY [order_id], [item_id]) AS rn
FROM [dbo].[orders]
WHERE
([order_id] > @p1)
OR ([order_id] = @p1 AND [item_id] > @p2)
)
SELECT key_str FROM ordered WHERE rn = 1000;
func MSSQLPhysLocChunkScanQuery ¶ added in v0.3.15
func MSSQLPhysLocChunkScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string
MSSQLPhysLocChunkScanQuery returns the SQL query for scanning a chunk using %%physloc%% in MSSQL
func MSSQLPhysLocExtremesQuery ¶ added in v0.3.15
func MSSQLPhysLocExtremesQuery(stream types.StreamInterface) string
MSSQLPhysLocExtremesQuery returns the query to fetch MIN and MAX %%physloc%% values for a table
func MSSQLPhysLocNextChunkEndQuery ¶ added in v0.3.15
func MSSQLPhysLocNextChunkEndQuery(stream types.StreamInterface, chunkSize int64) string
MSSQLPhysLocNextChunkEndQuery returns the query to find the next %%physloc%% chunk boundary
func MSSQLTableExistsQuery ¶ added in v0.3.15
func MSSQLTableExistsQuery(stream types.StreamInterface) string
MSSQLTableExistsQuery returns the query to check if a table has any rows
func MSSQLTableRowStatsQuery ¶ added in v0.3.15
func MSSQLTableRowStatsQuery() string
MSSQLTableRowStatsQuery returns the query to fetch the estimated row count and average row size of a table in MSSQL
func MSSQLTableSchemaQuery ¶ added in v0.3.15
func MSSQLTableSchemaQuery() string
MSSQLTableSchemaQuery returns the query to fetch the column_name, data_type, nullable, and primary_key information of a table in MSSQL
func MSSQLViewDatabaseStatePermissionQuery ¶ added in v0.5.1
func MSSQLViewDatabaseStatePermissionQuery() string
MSSQLViewDatabaseStatePermissionQuery checks for VIEW DATABASE STATE (all versions) or VIEW DATABASE PERFORMANCE STATE (SQL Server 2022+). Either permission grants access to sys.dm_cdc_log_scan_sessions.
func MapScan ¶
func MapScan(rows *sql.Rows, dest map[string]any, converter func(value interface{}, columnType string) (interface{}, error)) error
TODO: Use MapScanConcurrent instead of MapScan for incremental as well
func MapScanConcurrent ¶ added in v0.3.14
func MinMaxQuery ¶
func MinMaxQuery(stream types.StreamInterface, column string) string
MinMaxQuery returns the query to fetch MIN and MAX values of a column in a Postgres table
func MinMaxQueryMSSQL ¶ added in v0.3.15
func MinMaxQueryMSSQL(stream types.StreamInterface, columns []string) string
func MinMaxQueryMySQL ¶
func MinMaxQueryMySQL(stream types.StreamInterface, columns []string) string
MinMaxQueryMySQL returns the query to fetch MIN and MAX values of a column in a MySQL table
func MySQLBinlogFormatQuery ¶ added in v0.1.7
func MySQLBinlogFormatQuery() string
MySQLBinlogFormatQuery returns the query to fetch the binlog_format variable in MySQL
func MySQLBinlogRowMetadataQuery ¶ added in v0.1.7
func MySQLBinlogRowMetadataQuery() string
MySQLBinlogRowMetadataQuery returns the query to fetch the binlog_row_metadata variable in MySQL
func MySQLDiscoverTablesQuery ¶
func MySQLDiscoverTablesQuery() string
MySQLDiscoverTablesQuery returns the query to discover tables in a MySQL database
func MySQLLogBinQuery ¶ added in v0.1.7
func MySQLLogBinQuery() string
MySQLLogBinQuery returns the query to fetch the log_bin variable in MySQL
func MySQLMasterStatusQuery ¶
func MySQLMasterStatusQuery() string
MySQLMasterStatusQuery returns the query to fetch the current binlog position in MySQL: mysql v8.3 and below
func MySQLMasterStatusQueryNew ¶
func MySQLMasterStatusQueryNew() string
MySQLMasterStatusQuery returns the query to fetch the current binlog position in MySQL: mysql v8.4 and above
func MySQLPrimaryKeyQuery ¶
func MySQLPrimaryKeyQuery() string
MySQLPrimaryKeyQuery returns the query to fetch the primary key column of a table in MySQL
func MySQLTableColumnsQuery ¶
func MySQLTableColumnsQuery() string
MySQLTableColumnsQuery returns the query to fetch column names of a table in MySQL
func MySQLTableExistsQuery ¶ added in v0.2.4
func MySQLTableExistsQuery(stream types.StreamInterface) string
MySQLTableExistsQuery returns the query to check if a table has any rows using EXISTS
func MySQLTableRowStatsQuery ¶ added in v0.1.9
func MySQLTableRowStatsQuery() string
MySQLTableRowStatsQuery returns the query to fetch the estimated row count and average row size of a table in MySQL
func MySQLTableSchemaQuery ¶
func MySQLTableSchemaQuery() string
MySQLTableSchemaQuery returns the query to fetch schema information for a table in MySQL
func MySQLTimeZoneQuery ¶ added in v0.3.16
func MySQLTimeZoneQuery() string
MySQLTimeZoneQuery returns the query to fetch the timezone of the MySQL server and system timezone
func MySQLVersion ¶
MySQLVersion returns the version of the MySQL server It returns the flavor, major and minor version of the MySQL server
func MysqlChunkScanQuery ¶
func MysqlChunkScanQuery(stream types.StreamInterface, filterColumns []string, chunk types.Chunk, extraFilter string) string
MySQLWithoutState builds a chunk scan query for MySql
func MysqlLimitOffsetScanQuery ¶
func MysqlLimitOffsetScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string
MysqlLimitOffsetScanQuery is used to get the rows
func NextChunkEndQuery ¶
func NextChunkEndQuery(stream types.StreamInterface, columns []string, chunkSize int64) string
NextChunkEndQuery returns the query to calculate the next chunk boundary Example: Input:
stream.Namespace() = "mydb"
stream.Name() = "users"
columns = []string{"id", "created_at"}
chunkSize = 1000
Output:
SELECT CONCAT_WS(',', id, created_at) AS key_str FROM (
SELECT (id, created_at)
FROM `mydb`.`users`
WHERE (`id` > ?) OR (`id` = ? AND `created_at` > ?)
ORDER BY id, created_at
LIMIT 1 OFFSET 1000
) AS subquery
func NextRowIDQuery ¶ added in v0.1.3
func NextRowIDQuery(stream types.StreamInterface, ROWID string, chunkSize int64) string
NextRowIDQuery returns the query to fetch the next max row id
func OracleBlockSizeQuery ¶ added in v0.1.6
func OracleBlockSizeQuery() string
OracleBlockSizeQuery returns the query to fetch the size of a block in bytes in OracleDB
func OracleChunkCreationQuery ¶ added in v0.1.6
func OracleChunkCreationQuery(stream types.StreamInterface, blocksPerChunk int64, taskName string) string
OracleChunkCreationQuery returns the query to make chunks in OracleDB using DBMS_PARALLEL_EXECUTE
func OracleChunkRetrievalQuery ¶ added in v0.1.6
OracleChunkRetrievalQuery returns the query to retrieve chunks from DBMS_PARALLEL_EXECUTE in OracleDB
func OracleChunkScanQuery ¶ added in v0.1.3
func OracleChunkScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) (string, error)
OracleChunkScanQuery returns the query to fetch the rows of a table in OracleDB. Both chunk.Min and chunk.Max nil is invalid and returns an error.
func OracleChunkTaskCleanerQuery ¶ added in v0.1.6
OracleChunkTaskCleanerQuery returns the query to clean up a chunk task in OracleDB
func OracleColumnDataTypeQuery ¶ added in v0.6.0
OracleColumnDataTypeQuery returns the query to fetch the data type of a column in OracleDB
func OracleEmptyCheckQuery ¶ added in v0.1.3
func OracleEmptyCheckQuery(stream types.StreamInterface) string
OracleEmptyCheckQuery returns the query to check if a table is empty in OracleDB
func OracleMinMaxRowIDQuery ¶ added in v0.5.1
func OracleMinMaxRowIDQuery(stream types.StreamInterface) string
OracleMinMaxRowIDQuery returns the query to fetch the min and max row id of a table in OracleDB
func OraclePrimaryKeyColummsQuery ¶ added in v0.1.3
OraclePrimaryKeyQuery returns the query to fetch all the primary key columns of a table in OracleDB
func OracleTableDetailsQuery ¶ added in v0.1.3
OracleTableDetailsQuery returns the query to fetch the details of a table in OracleDB
func OracleTableDiscoveryQuery ¶ added in v0.1.6
func OracleTableDiscoveryQuery() string
OracleTableDiscoveryQuery returns the query to fetch the username and table name of all the tables which the current user has access to in OracleDB
func OracleTableRowStatsQuery ¶ added in v0.2.9
func OracleTableRowStatsQuery() string
OracleTableRowStatsQuery returns the query to fetch the estimated row count of a table in Oracle
func OracleTaskCreationQuery ¶ added in v0.1.6
OracleTaskCreationQuery returns the query to create a task in OracleDB
func PostgresBlockSizeQuery ¶ added in v0.1.9
func PostgresBlockSizeQuery() string
PostgresBlockSizeQuery returns the query to fetch the block size in PostgreSQL
func PostgresChunkScanQuery ¶
func PostgresChunkScanQuery(stream types.StreamInterface, filterColumn string, chunk types.Chunk, filter string) string
PostgresBuildSplitScanQuery builds a chunk scan query for PostgreSQL
func PostgresIsPartitionedQuery ¶ added in v0.2.9
func PostgresIsPartitionedQuery(stream types.StreamInterface) string
PostgresIsPartitionedQuery returns a SQL query that checks whether a table is partitioned. It counts how many partitions exist under the given parent table in the specified schema.
func PostgresNextChunkEndQuery ¶
func PostgresNextChunkEndQuery(stream types.StreamInterface, filterColumn string, filterValue interface{}) string
PostgresNextChunkEndQuery generates a SQL query to fetch the maximum value of a specified column
func PostgresPartitionPages ¶ added in v0.2.9
func PostgresPartitionPages(stream types.StreamInterface) string
TODO:check this query might be more optimized for performance PostgresPartitionPages returns leaf-partition page counts using a recursive CTE over pg_inherits. Works on all Postgres versions (10+); used as fallback for PG < 12.
func PostgresPartitionPagesPG12 ¶ added in v0.6.3
func PostgresPartitionPagesPG12(stream types.StreamInterface) string
PostgresPartitionPagesPG12 returns leaf-partition page counts using pg_partition_tree (PG 12+). Intermediate partitions are excluded via isleaf=true; works for any partition depth.
func PostgresRelPageCount ¶
func PostgresRelPageCount(stream types.StreamInterface) string
PostgresRelPageCount returns the query to fetch relation page count in PostgreSQL
func PostgresRowCountQuery ¶
func PostgresRowCountQuery(stream types.StreamInterface) string
PostgreSQL-Specific Queries TODO: Rewrite queries for taking vars as arguments while execution. PostgresRowCountQuery returns the query to fetch the estimated row count in PostgreSQL
func PostgresServerVersionNum ¶ added in v0.6.3
func PostgresServerVersionNum() string
PostgresServerVersionNum returns the server version as an integer (e.g. PG 14.5 → 140005).
func PostgresWalLSNQuery ¶
func PostgresWalLSNQuery() string
PostgresWalLSNQuery returns the query to fetch the current WAL LSN in PostgreSQL
func QuoteColumns ¶ added in v0.2.0
func QuoteColumns(columns []string, driver constants.DriverType) []string
QuoteColumns returns a slice of quoted column names
func QuoteIdentifier ¶ added in v0.2.0
func QuoteIdentifier(identifier string, driver constants.DriverType) string
QuoteIdentifier returns the properly quoted identifier based on database driver
func QuoteTable ¶ added in v0.2.0
func QuoteTable(schema, table string, driver constants.DriverType) string
QuoteTable returns the properly quoted schema.table combination
func SQLFilter ¶ added in v0.1.6
ParseFilter converts a filter string to a valid SQL WHERE condition, also appends the threshold filter if present
func ThresholdFilter ¶ added in v0.2.7
ThresholdFilter is used to update the filter for initial run of incremental sync during backfill. This is to avoid dupliction of records, as max cursor value is fetched before the chunk creation.
Types ¶
type DriverOptions ¶ added in v0.2.7
type DriverOptions struct {
Driver constants.DriverType
Stream types.StreamInterface
State *types.State
Client *sqlx.DB
}
DriverOptions contains options for creating various queries