dbconn

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2026 License: Apache-2.0 Imports: 22 Imported by: 0

README

Database Connections

The dbconn package provides MySQL database connection management and locking utilities for Spirit. It wraps database/sql with Spirit-specific concerns: retry logic, TLS auto-configuration, advisory locking, table locking, and the ability to kill blocking transactions.

Connection Setup

When creating a new connection, Spirit appends standardized DSN parameters to ensure consistent behavior across all connections. These include setting sql_mode="" (to be able to copy legacy data like 0000-00-00), time_zone=+00:00, transaction_isolation=read-committed, charset=utf8mb4, collation=utf8mb4_bin, and rejectReadOnly=true (for Aurora failover resilience). This means that regardless of the server's global configuration, Spirit connections behave predictably.

TLS

Spirit supports five TLS modes: DISABLED, PREFERRED, REQUIRED, VERIFY_CA, and VERIFY_IDENTITY. The default is PREFERRED, which first attempts a TLS connection and falls back to plaintext if it fails. RDS hosts are auto-detected via hostname pattern matching (*.rds.amazonaws.com), and an embedded RDS CA bundle is used automatically.

Retryable Transactions

RetryableTransaction is the primary mechanism for executing statements that may encounter transient errors. It classifies MySQL errors into retryable (deadlocks, lock wait timeouts, connection loss, read-only mode, killed queries) and fatal (everything else). On transient errors, the entire transaction is retried up to MaxRetries times.

An important subtlety is that RetryableTransaction inspects SHOW WARNINGS after every statement. This catches issues that MySQL does not surface as errors, such as range_optimizer_max_mem_size exceeded warnings. This particular warning is treated as fatal because it indicates a table scan will occur instead of an index range scan.

Force Kill

Both ForceExec and NewTableLock implement a timer-based force-kill pattern. They wait for 90% of LockWaitTimeout, then query performance_schema to identify and kill transactions that are blocking metadata lock acquisition. This is enabled by default and can be disabled with Spirit's --skip-force-kill flag.

There are two important safety constraints:

  1. Transaction weight threshold: Transactions with a weight above 1,000,000 (as reported by information_schema.innodb_trx.trx_weight) are never killed, because their rollback would be expensive and disruptive.
  2. Explicit table locks: Connections holding LOCK TABLES are never killed. Instead, an ErrTableLockFound error is returned. This is because killing non-transactional locks is unsafe.

Metadata Lock

MetadataLock provides an advisory locking mechanism using MySQL's GET_LOCK() function. It runs on a dedicated single-connection database pool with a background goroutine that periodically refreshes the lock. If the connection drops, it automatically reconnects and re-acquires locks.

Lock names are deterministic hashes of schema.table, truncated with a SHA1 suffix to fit MySQL's 64-character limit for lock names. This is used to prevent concurrent Spirit migrations on the same table.

Table Lock

TableLock wraps MySQL's LOCK TABLES ... WRITE statement. It integrates with the force-kill mechanism to automatically kill blocking transactions if the lock cannot be acquired within the timeout. This is used during the cutover phase.

Transaction Pool

TrxPool pre-creates a pool of REPEATABLE READ transactions with START TRANSACTION WITH CONSISTENT SNAPSHOT. This ensures all worker threads see the same point-in-time data, which is essential for parallel checksum verification.

See Also

Documentation

Overview

Package dbconn contains a series of database-related utility functions.

Index

Constants

View Source
const (
	// TableLockQuery is used to find tables that are locked by a LOCK TABLES command.
	// It's not really possible to find out how long the lock has been held, so we don't consider
	// the length of the lock here.
	TableLockQuery = `` /* 440-byte string literal not displayed */

	LongRunningEventQuery = `` /* 751-byte string literal not displayed */

)

Variables

View Source
var (

	// TransactionWeightThreshold is the maximum information_schema.innodb_trx.trx_weight
	// over which we consider a transaction too big to be safely killed. Rolling back a
	// heavy transaction can cause a huge impact on the database.
	TransactionWeightThreshold int64 = 1_000_000

	ErrTableLockFound = errors.New("explicit table lock found! spirit cannot proceed")
)

Functions

func BeginStandardTrx

func BeginStandardTrx(ctx context.Context, db *sql.DB, opts *sql.TxOptions) (*sql.Tx, int, error)

BeginStandardTrx is like db.BeginTx but returns the connection id.

func EnhanceDSNWithTLS

func EnhanceDSNWithTLS(inputDSN string, config *DBConfig) (string, error)

EnhanceDSNWithTLS enhances a DSN with TLS settings from the provided config if the DSN doesn't already contain TLS parameters. This allows replica connections to inherit TLS settings from the main connection while still respecting explicit TLS configuration in the DSN.

func Exec

func Exec(ctx context.Context, db *sql.DB, stmt string, args ...any) error

Exec is like db.Exec but only returns an error. This makes it a little bit easier to use in error handling. It accepts args which are escaped client side using the TiDB escape library. i.e. %n is an identifier, %? is automatic type conversion on a variable.

func ForceExec

func ForceExec(ctx context.Context, db *sql.DB, tables []*table.TableInfo, dbConfig *DBConfig, logger *slog.Logger, stmt string, args ...any) error

ForceExec is like Exec but it has some added logic to force kill any connections that are holding up metadata locks preventing this from succeeding.

func GetEmbeddedRDSBundle

func GetEmbeddedRDSBundle() []byte

GetEmbeddedRDSBundle returns the embedded RDS certificate bundle

func GetLockingTransactions

func GetLockingTransactions(ctx context.Context, db *sql.DB, tables []*table.TableInfo, config *DBConfig, logger *slog.Logger, ignorePIDs []int) ([]int, error)

GetLockingTransactions queries the performance schema to find locking transactions that are holding locks on the specified tables. It returns a list of PIDs of these transactions. If no tables are specified, it will return all long-running transactions. If a transaction's weight exceeds the TransactionWeightThreshold, it will be skipped. If no long-running transactions are found, it returns nil.

func GetTLSConfigForBinlog

func GetTLSConfigForBinlog(config *DBConfig, host string) (*tls.Config, error)

GetTLSConfigForBinlog creates a TLS config for binary log connections using the same logic as main database connections

func IsRDSHost

func IsRDSHost(host string) bool

func KillLockingTransactions

func KillLockingTransactions(ctx context.Context, db *sql.DB, tables []*table.TableInfo, config *DBConfig, logger *slog.Logger, ignorePIDs []int) error

func KillTransaction

func KillTransaction(ctx context.Context, db *sql.DB, pid int) error

func LoadCertificateFromFile

func LoadCertificateFromFile(filePath string) ([]byte, error)

LoadCertificateFromFile loads certificate data from a file

func New

func New(inputDSN string, config *DBConfig) (db *sql.DB, err error)

New is similar to sql.Open except we take the inputDSN and append additional options to it to standardize the connection. It will also ping the connection to ensure it is valid.

func NewCustomTLSConfig

func NewCustomTLSConfig(certData []byte, sslMode string) *tls.Config

NewCustomTLSConfig creates a TLS config based on SSL mode and certificate data

func NewTLSConfig

func NewTLSConfig() *tls.Config

NewTLSConfig creates a TLS config using the embedded RDS global bundle

func NewWithConnectionType

func NewWithConnectionType(inputDSN string, config *DBConfig, connectionType string) (db *sql.DB, err error)

NewWithConnectionType is like New but includes context about the connection type for better error messages

func RetryableTransaction

func RetryableTransaction(ctx context.Context, db *sql.DB, ignoreDupKeyWarnings bool, config *DBConfig, stmts ...string) (int64, error)

RetryableTransaction retries all statements in a transaction, retrying if a statement errors, or there is a deadlock. It will retry up to maxRetries times.

Types

type DBConfig

type DBConfig struct {
	LockWaitTimeout          int
	InnodbLockWaitTimeout    int
	MaxRetries               int
	MaxOpenConnections       int
	RangeOptimizerMaxMemSize int64
	InterpolateParams        bool
	ForceKill                bool // If true, kill locking transactions to acquire metadata locks (default: true)
	// TLS Configuration
	TLSMode            string // TLS connection mode (DISABLED, PREFERRED, REQUIRED, VERIFY_CA, VERIFY_IDENTITY)
	TLSCertificatePath string // Path to custom TLS certificate file
}

func NewDBConfig

func NewDBConfig() *DBConfig

type LockDetail

type LockDetail struct {
	PID          int
	User         sql.NullString
	Host         sql.NullString
	Info         sql.NullString
	ObjectType   sql.NullString
	ObjectSchema sql.NullString
	ObjectName   sql.NullString
	LockType     sql.NullString // e.g. "INTENTION_EXCLUSIVE", "SHARED_READ",
	LockDuration sql.NullString // e.g. "STATEMENT", "TRANSACTION"
	LockStatus   sql.NullString
	RunningTime  sql.NullString // Human-readable format of the timer_wait
	TimerWait    sql.NullInt64  // in picoseconds
	TrxWeight    sql.NullInt64  // Rows modified by the transaction
}

func GetTableLocks

func GetTableLocks(ctx context.Context, db *sql.DB, tables []*table.TableInfo, logger *slog.Logger, ignorePIDs []int) ([]*LockDetail, error)

type MetadataLock

type MetadataLock struct {
	// contains filtered or unexported fields
}

func NewMetadataLock

func NewMetadataLock(ctx context.Context, dsn string, tables []*table.TableInfo, config *DBConfig, logger *slog.Logger, optionFns ...func(*MetadataLock)) (*MetadataLock, error)

func (*MetadataLock) Close

func (m *MetadataLock) Close() error

func (*MetadataLock) CloseDBConnection

func (m *MetadataLock) CloseDBConnection(logger *slog.Logger) error

type TableLock

type TableLock struct {
	// contains filtered or unexported fields
}

func NewTableLock

func NewTableLock(ctx context.Context, db *sql.DB, tables []*table.TableInfo, config *DBConfig, logger *slog.Logger) (*TableLock, error)

NewTableLock creates a new server wide lock on multiple tables. i.e. LOCK TABLES .. WRITE. It uses a short timeout and *does not retry*. The caller is expected to retry, which gives it a chance to first do things like catch up on replication apply before it does the next attempt.

config.ForceKill=true is the default, and will more or less ensure that the lock acquisition is successful by killing long-running queries that are blocking our lock acquisition after we have waited for 90% of our configured LockWaitTimeout. It can be disabled with --skip-force-kill.

func (*TableLock) Close

func (s *TableLock) Close(ctx context.Context) error

Close closes the table lock

func (*TableLock) ExecUnderLock

func (s *TableLock) ExecUnderLock(ctx context.Context, stmts ...string) error

ExecUnderLock executes a set of statements under a table lock.

type TrxPool

type TrxPool struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewTrxPool

func NewTrxPool(ctx context.Context, db *sql.DB, count int, config *DBConfig) (*TrxPool, error)

NewTrxPool creates a pool of transactions which have already had their read-view created in REPEATABLE READ isolation.

func (*TrxPool) Close

func (p *TrxPool) Close() error

Close closes all transactions in the pool.

func (*TrxPool) Get

func (p *TrxPool) Get() (*sql.Tx, error)

Get gets a transaction from the pool.

func (*TrxPool) Put

func (p *TrxPool) Put(trx *sql.Tx)

Put puts a transaction back in the pool.

Directories

Path Synopsis
Package sqlescape provides SQL escaping functionality.
Package sqlescape provides SQL escaping functionality.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL