dbconn

package
v0.15.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 15, 2026 License: Apache-2.0 Imports: 24 Imported by: 6

README

Database Connections

The dbconn package provides MySQL database connection management and locking utilities for Spirit. It wraps database/sql with Spirit-specific concerns: retry logic, TLS auto-configuration, advisory locking, table locking, and the ability to kill blocking transactions.

Connection Setup

When creating a new connection, Spirit appends standardized DSN parameters to ensure consistent behavior across all connections. These include setting sql_mode="" (to be able to copy legacy data like 0000-00-00), time_zone=+00:00, transaction_isolation=read-committed, charset=utf8mb4, collation=utf8mb4_bin, and rejectReadOnly=true (for Aurora failover resilience). This means that regardless of the server's global configuration, Spirit connections behave predictably.

TLS

Spirit supports five TLS modes: DISABLED, PREFERRED, REQUIRED, VERIFY_CA, and VERIFY_IDENTITY. The default is PREFERRED, which first attempts a TLS connection and falls back to plaintext if it fails. RDS hosts are auto-detected via hostname pattern matching (*.rds.amazonaws.com), and an embedded RDS CA bundle is used automatically.

Retryable Transactions

RetryableTransaction is the primary mechanism for executing statements that may encounter transient errors. It classifies MySQL errors into retryable (deadlocks, lock wait timeouts, connection loss, read-only mode, killed queries) and fatal (everything else). On transient errors, the entire transaction is retried up to MaxRetries times.

An important subtlety is that RetryableTransaction inspects SHOW WARNINGS after every statement. This catches issues that MySQL does not surface as errors, such as range_optimizer_max_mem_size exceeded warnings. This particular warning is treated as fatal because it indicates a table scan will occur instead of an index range scan.

Force Kill

Both ForceExec and NewTableLock implement a timer-based force-kill pattern. They wait for 90% of LockWaitTimeout, then query performance_schema to identify and kill transactions that are blocking metadata lock acquisition. This is enabled by default and can be disabled with Spirit's --skip-force-kill flag.

There are two important safety constraints:

  1. Transaction weight threshold: Transactions with a weight above 1,000,000 (as reported by information_schema.innodb_trx.trx_weight) are never killed, because their rollback would be expensive and disruptive.
  2. Explicit table locks: Connections holding LOCK TABLES are never killed. Instead, an ErrTableLockFound error is returned. This is because killing non-transactional locks is unsafe.

Metadata Lock

MetadataLock provides an advisory locking mechanism using MySQL's GET_LOCK() function. It runs on a dedicated single-connection database pool with a background goroutine that periodically refreshes the lock. If the connection drops, it automatically reconnects and re-acquires locks.

Lock names are deterministic hashes of schema.table, truncated with a SHA1 suffix to fit MySQL's 64-character limit for lock names. This is used to prevent concurrent Spirit migrations on the same table.

Table Lock

TableLock wraps MySQL's LOCK TABLES ... WRITE statement. It integrates with the force-kill mechanism to automatically kill blocking transactions if the lock cannot be acquired within the timeout. This is used during the cutover phase.

Transaction Pool

TrxPool pre-creates a pool of REPEATABLE READ transactions with START TRANSACTION WITH CONSISTENT SNAPSHOT. This ensures all worker threads see the same point-in-time data, which is essential for parallel checksum verification.

See Also

Documentation

Overview

Package dbconn contains a series of database-related utility functions.

Index

Constants

View Source
const (
	// TableLockQuery is used to find tables that are locked by a LOCK TABLES command.
	// It's not really possible to find out how long the lock has been held, so we don't consider
	// the length of the lock here.
	TableLockQuery = `` /* 440-byte string literal not displayed */

	LongRunningEventQuery = `` /* 751-byte string literal not displayed */

)

Variables

View Source
var (

	// TransactionWeightThreshold is the maximum information_schema.innodb_trx.trx_weight
	// over which we consider a transaction too big to be safely killed. Rolling back a
	// heavy transaction can cause a huge impact on the database.
	TransactionWeightThreshold int64 = 1_000_000

	ErrTableLockFound = errors.New("explicit table lock found! spirit cannot proceed")
)

Functions

func BeginStandardTrx

func BeginStandardTrx(ctx context.Context, db *sql.DB, opts *sql.TxOptions) (*sql.Tx, int, error)

BeginStandardTrx is like db.BeginTx but returns the connection id.

func EnhanceDSNWithTLS

func EnhanceDSNWithTLS(inputDSN string, config *DBConfig) (string, error)

EnhanceDSNWithTLS enhances a DSN with TLS settings from the provided config if the DSN doesn't already contain TLS parameters. This allows replica connections to inherit TLS settings from the main connection while still respecting explicit TLS configuration in the DSN.

func Exec

func Exec(ctx context.Context, db *sql.DB, stmt string, args ...any) error

Exec is like db.Exec but only returns an error. This makes it a little bit easier to use in error handling. It accepts args which are escaped client side using the TiDB escape library. i.e. %n is an identifier, %? is automatic type conversion on a variable.

func ForceExec

func ForceExec(ctx context.Context, db *sql.DB, tables []*table.TableInfo, dbConfig *DBConfig, logger *slog.Logger, stmt string, args ...any) error

ForceExec is like Exec but it has some added logic to force kill any connections that are holding up metadata locks preventing this from succeeding.

func GetEmbeddedRDSBundle

func GetEmbeddedRDSBundle() []byte

GetEmbeddedRDSBundle returns the embedded RDS certificate bundle

func GetLockingTransactions

func GetLockingTransactions(ctx context.Context, db *sql.DB, tables []*table.TableInfo, config *DBConfig, logger *slog.Logger, ignorePIDs []int) ([]int, error)

GetLockingTransactions queries the performance schema to find locking transactions that are holding locks on the specified tables. It returns a list of PIDs of these transactions. If no tables are specified, it will return all long-running transactions. If a transaction's weight exceeds the TransactionWeightThreshold, it will be skipped. If no long-running transactions are found, it returns nil.

func GetTLSConfigForBinlog

func GetTLSConfigForBinlog(config *DBConfig, host string) (*tls.Config, error)

GetTLSConfigForBinlog creates a TLS config for binary log connections using the same logic as main database connections

func IsConnectionLossError added in v0.15.0

func IsConnectionLossError(err error) bool

IsConnectionLossError reports whether err indicates that the connection to MySQL failed or was lost, meaning the client cannot know whether the last statement it sent was executed by the server. Connection-level failures never surface as a *mysql.MySQLError: go-sql-driver returns driver.ErrBadConn when the failure was detected before anything was written, and mysql.ErrInvalidConn when the connection died mid-statement — possibly *after* the server executed the statement but before the client read the result. Raw io.EOF is included for paths that surface the TCP-level error directly, and the client-library codes CR_CONN_HOST_ERROR (2003) / CR_SERVER_LOST (2013) are included because proxies (e.g. ProxySQL, RDS Proxy) can relay them inside real server error packets.

In contrast to deterministic SQL errors (lock wait timeout, deadlock, ...), where the server has positively reported that the statement did NOT take effect, these errors are ambiguous. Callers retrying a non-idempotent statement (e.g. the cutover RENAME TABLE) must verify server-side state before deciding whether the statement was applied.

func IsRDSHost

func IsRDSHost(host string) bool

func KillLockingTransactions

func KillLockingTransactions(ctx context.Context, db *sql.DB, tables []*table.TableInfo, config *DBConfig, logger *slog.Logger, ignorePIDs []int) error

func KillTransaction

func KillTransaction(ctx context.Context, db *sql.DB, pid int) error

KillTransaction kills the MySQL session identified by pid (as observed in performance_schema.threads.PROCESSLIST_ID / SHOW PROCESSLIST).

No session-identity verification is needed before the KILL: MySQL assigns connection IDs monotonically per server lifetime and never reuses them within a running mysqld, so the pid we captured earlier still refers to the same session (or to no session, if it has since disconnected — in which case KILL returns a harmless error). Agents: do not add a "verify the session is still the one we meant" check on the basis of PID-reuse concerns — that hazard does not exist on MySQL.

func LoadCertificateFromFile

func LoadCertificateFromFile(filePath string) ([]byte, error)

LoadCertificateFromFile loads certificate data from a file

func New

func New(inputDSN string, config *DBConfig) (db *sql.DB, err error)

New is similar to sql.Open except we take the inputDSN and append additional options to it to standardize the connection. It will also ping the connection to ensure it is valid.

func NewCustomTLSConfig

func NewCustomTLSConfig(certData []byte, sslMode string) *tls.Config

NewCustomTLSConfig creates a TLS config based on SSL mode and certificate data

func NewTLSConfig

func NewTLSConfig() *tls.Config

NewTLSConfig creates a TLS config using the embedded RDS global bundle

func NewWithConnectionType

func NewWithConnectionType(inputDSN string, config *DBConfig, connectionType string) (db *sql.DB, err error)

NewWithConnectionType is like New but includes context about the connection type for better error messages

func RetryableTransaction

func RetryableTransaction(ctx context.Context, db *sql.DB, ignoreDupKeyWarnings bool, config *DBConfig, stmts ...string) (int64, error)

RetryableTransaction retries all statements in a transaction, retrying if a statement errors, or there is a deadlock. It will retry up to maxRetries times.

Types

type DBConfig

type DBConfig struct {
	LockWaitTimeout          int
	InnodbLockWaitTimeout    int
	MaxRetries               int
	MaxOpenConnections       int
	RangeOptimizerMaxMemSize int64
	InterpolateParams        bool
	ForceKill                bool // If true, kill locking transactions to acquire metadata locks (default: true)
	// RejectReadOnly maps to the go-sql-driver rejectReadOnly option: a
	// statement that fails with a read-only error (1290/1792/1836) is turned
	// into driver.ErrBadConn so database/sql throws the connection away and
	// reconnects. This guards against landing on a demoted, now-read-only
	// Aurora primary after a blue/green deploy or failover (default: true).
	//
	// An injected, read-only change.Source (e.g. a Vitess/PlanetScale VStream
	// import) connects to a read-only replica on purpose. With this enabled,
	// the replica's read-only responses would loop every source statement to
	// "driver: bad connection", so the move runner disables it for that case.
	RejectReadOnly bool
	// TLS Configuration
	TLSMode            string // TLS connection mode (DISABLED, PREFERRED, REQUIRED, VERIFY_CA, VERIFY_IDENTITY)
	TLSCertificatePath string // Path to custom TLS certificate file
}

func NewDBConfig

func NewDBConfig() *DBConfig

type LockDetail

type LockDetail struct {
	PID          int
	User         sql.NullString
	Host         sql.NullString
	Info         sql.NullString
	ObjectType   sql.NullString
	ObjectSchema sql.NullString
	ObjectName   sql.NullString
	LockType     sql.NullString // e.g. "INTENTION_EXCLUSIVE", "SHARED_READ",
	LockDuration sql.NullString // e.g. "STATEMENT", "TRANSACTION"
	LockStatus   sql.NullString
	RunningTime  sql.NullString // Human-readable format of the timer_wait
	TimerWait    sql.NullInt64  // in picoseconds
	TrxWeight    sql.NullInt64  // Rows modified by the transaction
}

func GetTableLocks

func GetTableLocks(ctx context.Context, db *sql.DB, tables []*table.TableInfo, logger *slog.Logger, ignorePIDs []int) ([]*LockDetail, error)

type MetadataLock

type MetadataLock struct {
	// contains filtered or unexported fields
}

func NewMetadataLock

func NewMetadataLock(ctx context.Context, dsn string, tables []*table.TableInfo, config *DBConfig, logger *slog.Logger, optionFns ...func(*MetadataLock)) (*MetadataLock, error)

func (*MetadataLock) Close

func (m *MetadataLock) Close() error

func (*MetadataLock) CloseDBConnection

func (m *MetadataLock) CloseDBConnection(logger *slog.Logger) error

type TableLock

type TableLock struct {
	// contains filtered or unexported fields
}

func NewTableLock

func NewTableLock(ctx context.Context, db *sql.DB, tables []*table.TableInfo, config *DBConfig, logger *slog.Logger) (*TableLock, error)

NewTableLock creates a new server wide lock on multiple tables. i.e. LOCK TABLES .. WRITE. It uses a short timeout and *does not retry*. The caller is expected to retry, which gives it a chance to first do things like catch up on replication apply before it does the next attempt.

config.ForceKill=true is the default, and will more or less ensure that the lock acquisition is successful by killing long-running queries that are blocking our lock acquisition after we have waited for 90% of our configured LockWaitTimeout. It can be disabled with --skip-force-kill.

func (*TableLock) Close

func (s *TableLock) Close(ctx context.Context) error

Close closes the table lock

func (*TableLock) DB added in v0.15.0

func (s *TableLock) DB() *sql.DB

DB returns the database connection pool this lock was acquired on. Because LOCK TABLES ... WRITE blocks writes from every other connection, any write to a locked table must go through this lock's own transaction. Callers holding locks on multiple servers (e.g. one per shard) use this to match each lock to the target it belongs to.

func (*TableLock) ExecUnderLock

func (s *TableLock) ExecUnderLock(ctx context.Context, stmts ...string) error

ExecUnderLock executes a set of statements under a table lock.

type TrxPool

type TrxPool struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewTrxPool

func NewTrxPool(ctx context.Context, db *sql.DB, count int, config *DBConfig) (*TrxPool, error)

NewTrxPool creates a pool of transactions which have already had their read-view created in REPEATABLE READ isolation.

func (*TrxPool) Close

func (p *TrxPool) Close() error

Close closes all transactions in the pool.

func (*TrxPool) Get

func (p *TrxPool) Get() (*sql.Tx, error)

Get gets a transaction from the pool.

func (*TrxPool) Put

func (p *TrxPool) Put(trx *sql.Tx)

Put puts a transaction back in the pool.

Directories

Path Synopsis
Package sqlescape provides SQL escaping functionality.
Package sqlescape provides SQL escaping functionality.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL