jdbc

package
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildIncrementalQuery added in v0.1.9

func BuildIncrementalQuery(ctx context.Context, opts DriverOptions) (string, []any, error)

BuildIncrementalQuery generates the incremental query SQL based on driver type

func DB2ApproxRowCountQuery added in v0.3.14

func DB2ApproxRowCountQuery(stream types.StreamInterface) string

DB2ApproxRowCountQuery uses generic CARD from SYSCAT.TABLES (CARD is approx).

func DB2AvgRowSizeQuery added in v0.3.14

func DB2AvgRowSizeQuery(stream types.StreamInterface) string

DB2AvgRowSizeQuery returns the query to fetch the average row size for a table in DB2

func DB2DiscoveryQuery added in v0.3.14

func DB2DiscoveryQuery() string

DB2DiscoveryQuery returns the query to discover tables in a DB2 database with filter for 'T' (Table) and 'V' (View).

func DB2MinMaxPKQuery added in v0.3.14

func DB2MinMaxPKQuery(stream types.StreamInterface, columns []string) string

DB2MinMaxPKQuery returns the query to fetch min/max values of a column

func DB2MinMaxRidQuery added in v0.3.14

func DB2MinMaxRidQuery(stream types.StreamInterface) string

DB2MinMaxRidQuery to find the min/max of RIDs for chunking

func DB2NextChunkEndQuery added in v0.3.14

func DB2NextChunkEndQuery(stream types.StreamInterface, columns []string, chunkSize int64) string

DB2NextChunkEndQuery returns the query to calculate the next chunk boundary for DB2

func DB2PKChunkScanQuery added in v0.3.14

func DB2PKChunkScanQuery(stream types.StreamInterface, filterColumns []string, chunk types.Chunk, extraFilter string) string

DB2PKChunkScanQuery builds a chunk scan query for DB2 using Primary Keys

func DB2PageStatsQuery added in v0.3.14

func DB2PageStatsQuery(stream types.StreamInterface) string

DB2PageStatsQuery returns the query to fetch the page size and number of pages for a table

func DB2RidChunkScanQuery added in v0.3.14

func DB2RidChunkScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string

DB2RidChunkScanQuery returns the query to fetch rows for a specific chunk using RID

func DB2TableSchemaAndPrimaryKeysQuery added in v0.3.14

func DB2TableSchemaAndPrimaryKeysQuery() string

DB2TableSchemaAndPrimaryKeysQuery returns the query to fetch columns for a specific table

func DB2TableStatsExistQuery added in v0.3.14

func DB2TableStatsExistQuery(stream types.StreamInterface) string

DB2TableStatsExistQuery returns the query to check if a table exists in DB2

func GetMaxCursorValues added in v0.2.7

func GetMaxCursorValues(ctx context.Context, client *sqlx.DB, driverType constants.DriverType, stream types.StreamInterface) (any, any, error)

func GetPlaceholder added in v0.2.7

func GetPlaceholder(driver constants.DriverType) func(int) string

GetPlaceholder returns the appropriate placeholder for the given driver

func IncrementalValueFormatter added in v0.3.14

func IncrementalValueFormatter(ctx context.Context, cursorField, argumentPlaceholder string, isBackfill bool, lastCursorValue any, opts DriverOptions) (string, any, error)

IncrementalValueFormatter is used to format the value of the cursor field for incremental sync, mainly because of the various timestamp formats

func MSSQLCDCAdvanceLSNQuery added in v0.3.15

func MSSQLCDCAdvanceLSNQuery() string

MSSQLCDCAdvanceLSNQuery returns the query to increment an LSN for CDC

func MSSQLCDCDiscoverQuery added in v0.3.15

func MSSQLCDCDiscoverQuery(streamID string) string

MSSQLCDCDiscoverQuery returns the query to discover CDC-enabled capture instances

func MSSQLCDCGetChangesQuery added in v0.3.15

func MSSQLCDCGetChangesQuery(captureInstance string) string

MSSQLCDCGetChangesQuery returns the query to fetch CDC changes for a capture instance

func MSSQLCDCMaxLSNQuery added in v0.3.15

func MSSQLCDCMaxLSNQuery() string

TODO: check about `sys.fn_cdc_get_min_lsn` MSSQLCDCMaxLSNQuery returns the query to fetch the current maximum LSN for CDC

func MSSQLCDCSupportQuery added in v0.3.15

func MSSQLCDCSupportQuery() string

MSSQLCDCSupportQuery returns the query to check if CDC is enabled for the current database

func MSSQLCDCTableEnabledQuery added in v0.3.15

func MSSQLCDCTableEnabledQuery() string

MSSQLCDCTableEnabledQuery returns the query to check if CDC is enabled for a specific table

func MSSQLChunkScanQuery added in v0.3.15

func MSSQLChunkScanQuery(stream types.StreamInterface, filterColumns []string, chunk types.Chunk, filter string) string

MSSQLChunkScanQuery returns the SQL query for scanning a chunk in MSSQL

func MSSQLDiscoverTablesQuery added in v0.3.15

func MSSQLDiscoverTablesQuery() string

MSSQLDiscoverTablesQuery returns the query to discover tables in a MSSQL database

func MSSQLNextChunkEndQuery added in v0.3.15

func MSSQLNextChunkEndQuery(stream types.StreamInterface, orderingColumns []string, chunkSize int64) string

MSSQLNextChunkEndQuery returns the query to calculate the next chunk boundary.

Example:

stream.Namespace()   = "dbo"
stream.Name()        = "orders"
orderingColumns      = []string{"order_id", "item_id"}
chunkSize            = 1000

Conceptual output:

WITH ordered AS (
  SELECT
    <concat(order_id, item_id)> AS key_str,
    ROW_NUMBER() OVER (ORDER BY [order_id], [item_id]) AS rn
  FROM [dbo].[orders]
  WHERE
    ([order_id] > @p1)
     OR ([order_id] = @p1 AND [item_id] > @p2)
)
SELECT key_str FROM ordered WHERE rn = 1000;

func MSSQLPhysLocChunkScanQuery added in v0.3.15

func MSSQLPhysLocChunkScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string

MSSQLPhysLocChunkScanQuery returns the SQL query for scanning a chunk using %%physloc%% in MSSQL

func MSSQLPhysLocExtremesQuery added in v0.3.15

func MSSQLPhysLocExtremesQuery(stream types.StreamInterface) string

MSSQLPhysLocExtremesQuery returns the query to fetch MIN and MAX %%physloc%% values for a table

func MSSQLPhysLocNextChunkEndQuery added in v0.3.15

func MSSQLPhysLocNextChunkEndQuery(stream types.StreamInterface, chunkSize int64) string

MSSQLPhysLocNextChunkEndQuery returns the query to find the next %%physloc%% chunk boundary

func MSSQLTableExistsQuery added in v0.3.15

func MSSQLTableExistsQuery(stream types.StreamInterface) string

MSSQLTableExistsQuery returns the query to check if a table has any rows

func MSSQLTableRowStatsQuery added in v0.3.15

func MSSQLTableRowStatsQuery() string

MSSQLTableRowStatsQuery returns the query to fetch the estimated row count and average row size of a table in MSSQL

func MSSQLTableSchemaQuery added in v0.3.15

func MSSQLTableSchemaQuery() string

MSSQLTableSchemaQuery returns the query to fetch the column_name, data_type, nullable, and primary_key information of a table in MSSQL

func MapScan

func MapScan(rows *sql.Rows, dest map[string]any, converter func(value interface{}, columnType string) (interface{}, error)) error

TODO: Use MapScanConcurrent instead of MapScan for incremental as well

func MapScanConcurrent added in v0.3.14

func MapScanConcurrent(setter *Reader[*sql.Rows], converter func(value interface{}, columnType string) (interface{}, error), OnMessage abstract.BackfillMsgFn) error

func MinMaxQuery

func MinMaxQuery(stream types.StreamInterface, column string) string

MinMaxQuery returns the query to fetch MIN and MAX values of a column in a Postgres table

func MinMaxQueryMSSQL added in v0.3.15

func MinMaxQueryMSSQL(stream types.StreamInterface, columns []string) string

func MinMaxQueryMySQL

func MinMaxQueryMySQL(stream types.StreamInterface, columns []string) string

MinMaxQueryMySQL returns the query to fetch MIN and MAX values of a column in a MySQL table

func MySQLBinlogFormatQuery added in v0.1.7

func MySQLBinlogFormatQuery() string

MySQLBinlogFormatQuery returns the query to fetch the binlog_format variable in MySQL

func MySQLBinlogRowMetadataQuery added in v0.1.7

func MySQLBinlogRowMetadataQuery() string

MySQLBinlogRowMetadataQuery returns the query to fetch the binlog_row_metadata variable in MySQL

func MySQLDiscoverTablesQuery

func MySQLDiscoverTablesQuery() string

MySQLDiscoverTablesQuery returns the query to discover tables in a MySQL database

func MySQLLogBinQuery added in v0.1.7

func MySQLLogBinQuery() string

MySQLLogBinQuery returns the query to fetch the log_bin variable in MySQL

func MySQLMasterStatusQuery

func MySQLMasterStatusQuery() string

MySQLMasterStatusQuery returns the query to fetch the current binlog position in MySQL: mysql v8.3 and below

func MySQLMasterStatusQueryNew

func MySQLMasterStatusQueryNew() string

MySQLMasterStatusQuery returns the query to fetch the current binlog position in MySQL: mysql v8.4 and above

func MySQLPrimaryKeyQuery

func MySQLPrimaryKeyQuery() string

MySQLPrimaryKeyQuery returns the query to fetch the primary key column of a table in MySQL

func MySQLTableColumnsQuery

func MySQLTableColumnsQuery() string

MySQLTableColumnsQuery returns the query to fetch column names of a table in MySQL

func MySQLTableExistsQuery added in v0.2.4

func MySQLTableExistsQuery(stream types.StreamInterface) string

MySQLTableExistsQuery returns the query to check if a table has any rows using EXISTS

func MySQLTableRowStatsQuery added in v0.1.9

func MySQLTableRowStatsQuery() string

MySQLTableRowStatsQuery returns the query to fetch the estimated row count and average row size of a table in MySQL

func MySQLTableSchemaQuery

func MySQLTableSchemaQuery() string

MySQLTableSchemaQuery returns the query to fetch schema information for a table in MySQL

func MySQLTimeZoneQuery added in v0.3.16

func MySQLTimeZoneQuery() string

MySQLTimeZoneQuery returns the query to fetch the timezone of the MySQL server and system timezone

func MySQLVersion

func MySQLVersion(ctx context.Context, client *sqlx.DB) (string, int, int, error)

MySQLVersion returns the version of the MySQL server It returns the flavor, major and minor version of the MySQL server

func MysqlChunkScanQuery

func MysqlChunkScanQuery(stream types.StreamInterface, filterColumns []string, chunk types.Chunk, extraFilter string) string

MySQLWithoutState builds a chunk scan query for MySql

func MysqlLimitOffsetScanQuery

func MysqlLimitOffsetScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string

MysqlLimitOffsetScanQuery is used to get the rows

func NextChunkEndQuery

func NextChunkEndQuery(stream types.StreamInterface, columns []string, chunkSize int64) string

NextChunkEndQuery returns the query to calculate the next chunk boundary Example: Input:

stream.Namespace() = "mydb"
stream.Name() = "users"
columns = []string{"id", "created_at"}
chunkSize = 1000

Output:

SELECT CONCAT_WS(',', id, created_at) AS key_str FROM (
  SELECT (id, created_at)
  FROM `mydb`.`users`
  WHERE (`id` > ?) OR (`id` = ? AND `created_at` > ?)
  ORDER BY id, created_at
  LIMIT 1 OFFSET 1000
) AS subquery

func OracleBlockSizeQuery added in v0.1.6

func OracleBlockSizeQuery() string

OracleTableSizeQuery returns the query to fetch the size of a table in bytes in OracleDB

func OracleChunkCreationQuery added in v0.1.6

func OracleChunkCreationQuery(stream types.StreamInterface, blocksPerChunk int64, taskName string) string

OracleChunkCreationQuery returns the query to make chunks in OracleDB using DBMS_PARALLEL_EXECUTE

func OracleChunkRetrievalQuery added in v0.1.6

func OracleChunkRetrievalQuery(taskName string) string

OracleChunkRetrievalQuery returns the query to retrieve chunks from DBMS_PARALLEL_EXECUTE in OracleDB

func OracleChunkScanQuery added in v0.1.3

func OracleChunkScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string

OracleChunkScanQuery returns the query to fetch the rows of a table in OracleDB

func OracleChunkTaskCleanerQuery added in v0.1.6

func OracleChunkTaskCleanerQuery(taskName string) string

OracleChunkTaskCleanerQuery returns the query to clean up a chunk task in OracleDB

func OracleEmptyCheckQuery added in v0.1.3

func OracleEmptyCheckQuery(stream types.StreamInterface) string

OracleEmptyCheckQuery returns the query to check if a table is empty in OracleDB

func OraclePrimaryKeyColummsQuery added in v0.1.3

func OraclePrimaryKeyColummsQuery(schemaName, tableName string) string

OraclePrimaryKeyQuery returns the query to fetch all the primary key columns of a table in OracleDB

func OracleTableDetailsQuery added in v0.1.3

func OracleTableDetailsQuery(schemaName, tableName string) string

OracleTableDetailsQuery returns the query to fetch the details of a table in OracleDB

func OracleTableDiscoveryQuery added in v0.1.6

func OracleTableDiscoveryQuery() string

OracleTableDiscoveryQuery returns the query to fetch the username and table name of all the tables which the current user has access to in OracleDB

func OracleTableRowStatsQuery added in v0.2.9

func OracleTableRowStatsQuery() string

OracleTableRowStatsQuery returns the query to fetch the estimated row count of a table in Oracle

func OracleTaskCreationQuery added in v0.1.6

func OracleTaskCreationQuery(taskName string) string

OracleTaskCreationQuery returns the query to create a task in OracleDB

func PostgresBlockSizeQuery added in v0.1.9

func PostgresBlockSizeQuery() string

PostgresBlockSizeQuery returns the query to fetch the block size in PostgreSQL

func PostgresChunkScanQuery

func PostgresChunkScanQuery(stream types.StreamInterface, filterColumn string, chunk types.Chunk, filter string) string

PostgresBuildSplitScanQuery builds a chunk scan query for PostgreSQL

func PostgresIsPartitionedQuery added in v0.2.9

func PostgresIsPartitionedQuery(stream types.StreamInterface) string

PostgresIsPartitionedQuery returns a SQL query that checks whether a table is partitioned. It counts how many partitions exist under the given parent table in the specified schema.

func PostgresNextChunkEndQuery

func PostgresNextChunkEndQuery(stream types.StreamInterface, filterColumn string, filterValue interface{}) string

PostgresNextChunkEndQuery generates a SQL query to fetch the maximum value of a specified column

func PostgresPartitionPages added in v0.2.9

func PostgresPartitionPages(stream types.StreamInterface) string

PostgresPartitionPages returns total relpages for each partition and the parent table. This can be used to dynamically adjust chunk sizes based on partition distribution.

func PostgresRelPageCount

func PostgresRelPageCount(stream types.StreamInterface) string

PostgresRelPageCount returns the query to fetch relation page count in PostgreSQL

func PostgresRowCountQuery

func PostgresRowCountQuery(stream types.StreamInterface) string

PostgreSQL-Specific Queries TODO: Rewrite queries for taking vars as arguments while execution. PostgresRowCountQuery returns the query to fetch the estimated row count in PostgreSQL

func PostgresWalLSNQuery

func PostgresWalLSNQuery() string

PostgresWalLSNQuery returns the query to fetch the current WAL LSN in PostgreSQL

func QuoteColumns added in v0.2.0

func QuoteColumns(columns []string, driver constants.DriverType) []string

QuoteColumns returns a slice of quoted column names

func QuoteIdentifier added in v0.2.0

func QuoteIdentifier(identifier string, driver constants.DriverType) string

QuoteIdentifier returns the properly quoted identifier based on database driver

func QuoteTable added in v0.2.0

func QuoteTable(schema, table string, driver constants.DriverType) string

QuoteTable returns the properly quoted schema.table combination

func SQLFilter added in v0.1.6

func SQLFilter(stream types.StreamInterface, driver string, thresholdFilter string) (string, error)

ParseFilter converts a filter string to a valid SQL WHERE condition, also appends the threshold filter if present

func ThresholdFilter added in v0.2.7

func ThresholdFilter(ctx context.Context, opts DriverOptions) (string, []any, error)

ThresholdFilter is used to update the filter for initial run of incremental sync during backfill. This is to avoid dupliction of records, as max cursor value is fetched before the chunk creation.

func WithIsolation

func WithIsolation(ctx context.Context, client *sqlx.DB, readOnly bool, fn func(tx *sql.Tx) error) error

Types

type DriverOptions added in v0.2.7

type DriverOptions struct {
	Driver constants.DriverType
	Stream types.StreamInterface
	State  *types.State
	Client *sqlx.DB
}

DriverOptions contains options for creating various queries

type Reader

type Reader[T types.Iterable] struct {
	// contains filtered or unexported fields
}

func NewReader

func NewReader[T types.Iterable](ctx context.Context, baseQuery string,
	exec func(ctx context.Context, query string, args ...any) (T, error), args ...any) *Reader[T]

func (*Reader[T]) Capture

func (o *Reader[T]) Capture(onCapture func(T) error) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL