jdbc

package
v0.3.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildIncrementalQuery added in v0.1.9

func BuildIncrementalQuery(ctx context.Context, opts DriverOptions) (string, []any, error)

BuildIncrementalQuery generates the incremental query SQL based on driver type

func GetMaxCursorValues added in v0.2.7

func GetMaxCursorValues(ctx context.Context, client *sqlx.DB, driverType constants.DriverType, stream types.StreamInterface) (any, any, error)

func GetPlaceholder added in v0.2.7

func GetPlaceholder(driver constants.DriverType) func(int) string

GetPlaceholder returns the appropriate placeholder for the given driver

func MapScan

func MapScan(rows *sql.Rows, dest map[string]any, converter func(value interface{}, columnType string) (interface{}, error)) error

func MinMaxQuery

func MinMaxQuery(stream types.StreamInterface, column string) string

MinMaxQuery returns the query to fetch MIN and MAX values of a column in a Postgres table

func MinMaxQueryMySQL

func MinMaxQueryMySQL(stream types.StreamInterface, columns []string) string

MinMaxQueryMySQL returns the query to fetch MIN and MAX values of a column in a MySQL table

func MySQLBinlogFormatQuery added in v0.1.7

func MySQLBinlogFormatQuery() string

MySQLBinlogFormatQuery returns the query to fetch the binlog_format variable in MySQL

func MySQLBinlogRowMetadataQuery added in v0.1.7

func MySQLBinlogRowMetadataQuery() string

MySQLBinlogRowMetadataQuery returns the query to fetch the binlog_row_metadata variable in MySQL

func MySQLDiscoverTablesQuery

func MySQLDiscoverTablesQuery() string

MySQLDiscoverTablesQuery returns the query to discover tables in a MySQL database

func MySQLLogBinQuery added in v0.1.7

func MySQLLogBinQuery() string

MySQLLogBinQuery returns the query to fetch the log_bin variable in MySQL

func MySQLMasterStatusQuery

func MySQLMasterStatusQuery() string

MySQLMasterStatusQuery returns the query to fetch the current binlog position in MySQL: mysql v8.3 and below

func MySQLMasterStatusQueryNew

func MySQLMasterStatusQueryNew() string

MySQLMasterStatusQuery returns the query to fetch the current binlog position in MySQL: mysql v8.4 and above

func MySQLPrimaryKeyQuery

func MySQLPrimaryKeyQuery() string

MySQLPrimaryKeyQuery returns the query to fetch the primary key column of a table in MySQL

func MySQLTableColumnsQuery

func MySQLTableColumnsQuery() string

MySQLTableColumnsQuery returns the query to fetch column names of a table in MySQL

func MySQLTableExistsQuery added in v0.2.4

func MySQLTableExistsQuery(stream types.StreamInterface) string

MySQLTableExistsQuery returns the query to check if a table has any rows using EXISTS

func MySQLTableRowStatsQuery added in v0.1.9

func MySQLTableRowStatsQuery() string

MySQLTableRowStatsQuery returns the query to fetch the estimated row count and average row size of a table in MySQL

func MySQLTableSchemaQuery

func MySQLTableSchemaQuery() string

MySQLTableSchemaQuery returns the query to fetch schema information for a table in MySQL

func MySQLVersion

func MySQLVersion(ctx context.Context, client *sqlx.DB) (string, int, int, error)

MySQLVersion returns the version of the MySQL server It returns the flavor, major and minor version of the MySQL server

func MysqlChunkScanQuery

func MysqlChunkScanQuery(stream types.StreamInterface, filterColumns []string, chunk types.Chunk, extraFilter string) string

MySQLWithoutState builds a chunk scan query for MySql

func MysqlLimitOffsetScanQuery

func MysqlLimitOffsetScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string

MysqlLimitOffsetScanQuery is used to get the rows

func NextChunkEndQuery

func NextChunkEndQuery(stream types.StreamInterface, columns []string, chunkSize int64) string

NextChunkEndQuery returns the query to calculate the next chunk boundary Example: Input:

stream.Namespace() = "mydb"
stream.Name() = "users"
columns = []string{"id", "created_at"}
chunkSize = 1000

Output:

SELECT CONCAT_WS(',', id, created_at) AS key_str FROM (
  SELECT (',', id, created_at)
  FROM `mydb`.`users`
  WHERE (`id` > ?) OR (`id` = ? AND `created_at` > ?)
  ORDER BY id, created_at
  LIMIT 1 OFFSET 1000
) AS subquery

func OracleBlockSizeQuery added in v0.1.6

func OracleBlockSizeQuery() string

OracleTableSizeQuery returns the query to fetch the size of a table in bytes in OracleDB

func OracleChunkCreationQuery added in v0.1.6

func OracleChunkCreationQuery(stream types.StreamInterface, blocksPerChunk int64, taskName string) string

OracleChunkCreationQuery returns the query to make chunks in OracleDB using DBMS_PARALLEL_EXECUTE

func OracleChunkRetrievalQuery added in v0.1.6

func OracleChunkRetrievalQuery(taskName string) string

OracleChunkRetrievalQuery returns the query to retrieve chunks from DBMS_PARALLEL_EXECUTE in OracleDB

func OracleChunkScanQuery added in v0.1.3

func OracleChunkScanQuery(stream types.StreamInterface, chunk types.Chunk, filter string) string

OracleChunkScanQuery returns the query to fetch the rows of a table in OracleDB

func OracleChunkTaskCleanerQuery added in v0.1.6

func OracleChunkTaskCleanerQuery(taskName string) string

OracleChunkTaskCleanerQuery returns the query to clean up a chunk task in OracleDB

func OracleEmptyCheckQuery added in v0.1.3

func OracleEmptyCheckQuery(stream types.StreamInterface) string

OracleEmptyCheckQuery returns the query to check if a table is empty in OracleDB

func OracleIncrementalValueFormatter added in v0.1.9

func OracleIncrementalValueFormatter(ctx context.Context, cursorField, argumentPlaceholder string, isBackfill bool, lastCursorValue any, opts DriverOptions) (string, any, error)

OracleIncrementalValueFormatter is used to format the value of the cursor field for Oracle incremental sync, mainly because of the various timestamp formats

func OraclePrimaryKeyColummsQuery added in v0.1.3

func OraclePrimaryKeyColummsQuery(schemaName, tableName string) string

OraclePrimaryKeyQuery returns the query to fetch all the primary key columns of a table in OracleDB

func OracleTableDetailsQuery added in v0.1.3

func OracleTableDetailsQuery(schemaName, tableName string) string

OracleTableDetailsQuery returns the query to fetch the details of a table in OracleDB

func OracleTableDiscoveryQuery added in v0.1.6

func OracleTableDiscoveryQuery() string

OracleTableDiscoveryQuery returns the query to fetch the username and table name of all the tables which the current user has access to in OracleDB

func OracleTableRowStatsQuery added in v0.2.9

func OracleTableRowStatsQuery() string

OracleTableRowStatsQuery returns the query to fetch the estimated row count of a table in Oracle

func OracleTaskCreationQuery added in v0.1.6

func OracleTaskCreationQuery(taskName string) string

OracleTaskCreationQuery returns the query to create a task in OracleDB

func PostgresBlockSizeQuery added in v0.1.9

func PostgresBlockSizeQuery() string

PostgresBlockSizeQuery returns the query to fetch the block size in PostgreSQL

func PostgresChunkScanQuery

func PostgresChunkScanQuery(stream types.StreamInterface, filterColumn string, chunk types.Chunk, filter string) string

PostgresBuildSplitScanQuery builds a chunk scan query for PostgreSQL

func PostgresIsPartitionedQuery added in v0.2.9

func PostgresIsPartitionedQuery(stream types.StreamInterface) string

PostgresIsPartitionedQuery returns a SQL query that checks whether a table is partitioned. It counts how many partitions exist under the given parent table in the specified schema.

func PostgresNextChunkEndQuery

func PostgresNextChunkEndQuery(stream types.StreamInterface, filterColumn string, filterValue interface{}) string

PostgresNextChunkEndQuery generates a SQL query to fetch the maximum value of a specified column

func PostgresPartitionPages added in v0.2.9

func PostgresPartitionPages(stream types.StreamInterface) string

PostgresPartitionPages returns total relpages for each partition and the parent table. This can be used to dynamically adjust chunk sizes based on partition distribution.

func PostgresRelPageCount

func PostgresRelPageCount(stream types.StreamInterface) string

PostgresRelPageCount returns the query to fetch relation page count in PostgreSQL

func PostgresRowCountQuery

func PostgresRowCountQuery(stream types.StreamInterface) string

PostgreSQL-Specific Queries TODO: Rewrite queries for taking vars as arguments while execution. PostgresRowCountQuery returns the query to fetch the estimated row count in PostgreSQL

func PostgresWalLSNQuery

func PostgresWalLSNQuery() string

PostgresWalLSNQuery returns the query to fetch the current WAL LSN in PostgreSQL

func QuoteColumns added in v0.2.0

func QuoteColumns(columns []string, driver constants.DriverType) []string

QuoteColumns returns a slice of quoted column names

func QuoteIdentifier added in v0.2.0

func QuoteIdentifier(identifier string, driver constants.DriverType) string

QuoteIdentifier returns the properly quoted identifier based on database driver

func QuoteTable added in v0.2.0

func QuoteTable(schema, table string, driver constants.DriverType) string

QuoteTable returns the properly quoted schema.table combination

func SQLFilter added in v0.1.6

func SQLFilter(stream types.StreamInterface, driver string, thresholdFilter string) (string, error)

ParseFilter converts a filter string to a valid SQL WHERE condition, also appends the threshold filter if present

func ThresholdFilter added in v0.2.7

func ThresholdFilter(ctx context.Context, opts DriverOptions) (string, []any, error)

ThresholdFilter is used to update the filter for initial run of incremental sync during backfill. This is to avoid dupliction of records, as max cursor value is fetched before the chunk creation.

func WithIsolation

func WithIsolation(ctx context.Context, client *sqlx.DB, fn func(tx *sql.Tx) error) error

Types

type DriverOptions added in v0.2.7

type DriverOptions struct {
	Driver constants.DriverType
	Stream types.StreamInterface
	State  *types.State
	Client *sqlx.DB
}

DriverOptions contains options for creating various queries

type Reader

type Reader[T types.Iterable] struct {
	// contains filtered or unexported fields
}

func NewReader

func NewReader[T types.Iterable](ctx context.Context, baseQuery string,
	exec func(ctx context.Context, query string, args ...any) (T, error), args ...any) *Reader[T]

func (*Reader[T]) Capture

func (o *Reader[T]) Capture(onCapture func(T) error) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL