crdb

package
v2.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2025 License: Apache-2.0 Imports: 6 Imported by: 86

README

CRDB

crdb is a wrapper around the logic for issuing SQL transactions which performs retries (as required by CockroachDB).

Basic Usage

import "github.com/cockroachdb/cockroach-go/v2/crdb"

err := crdb.ExecuteTx(ctx, db, nil, func(tx *sql.Tx) error {
    // Your transaction logic here
    _, err := tx.ExecContext(ctx, "UPDATE accounts SET balance = balance - 100 WHERE id = 1")
    return err
})

Retry Policies

By default, transactions retry up to 50 times with no delay between attempts. You can customize retry behavior using context options.

Limiting Retries
// Retry up to 10 times
ctx := crdb.WithMaxRetries(context.Background(), 10)
err := crdb.ExecuteTx(ctx, db, nil, func(tx *sql.Tx) error {
    // ...
})
Unlimited Retries
// Retry indefinitely (use with caution - ensure you have a context timeout!)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
ctx = crdb.WithMaxRetries(ctx, 0)
Disabling Retries
// Execute only once, no retries
ctx := crdb.WithNoRetries(context.Background())
Fixed Delay Between Retries
ctx := crdb.WithRetryPolicy(context.Background(), &crdb.LimitBackoffRetryPolicy{
    RetryLimit: 10,
    Delay:      100 * time.Millisecond,
})
Exponential Backoff
ctx := crdb.WithRetryPolicy(context.Background(), &crdb.ExpBackoffRetryPolicy{
    RetryLimit: 10,
    BaseDelay:  100 * time.Millisecond,  // First retry waits 100ms
    MaxDelay:   5 * time.Second,          // Cap delay at 5s
})
// Delays: 100ms, 200ms, 400ms, 800ms, 1.6s, 3.2s, 5s, 5s, 5s, 5s
Custom Retry Policies

Implement the RetryPolicy interface for custom behavior:

type RetryPolicy interface {
    NewRetry() RetryFunc
}

type RetryFunc func(err error) (delay time.Duration, retryErr error)

You can also adapt third-party backoff libraries using ExternalBackoffPolicy():

import "github.com/sethvargo/go-retry"

ctx := crdb.WithRetryPolicy(context.Background(), crdb.ExternalBackoffPolicy(func() crdb.ExternalBackoff {
    return retry.NewFibonacci(1 * time.Second)
}))

Framework Support

Subpackages provide support for popular frameworks:

Package Framework Import
crdbpgx pgx v4 (standalone) github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx
crdbpgxv5 pgx v5 (standalone) github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgxv5
crdbgorm GORM github.com/cockroachdb/cockroach-go/v2/crdb/crdbgorm
crdbsqlx sqlx github.com/cockroachdb/cockroach-go/v2/crdb/crdbsqlx

Error Wrapping

When wrapping errors inside transaction functions, use %w or errors.Wrap() to preserve retry detection:

// WRONG - masks retryable error
return fmt.Errorf("failed: %s", err)

// CORRECT - preserves error for retry detection
return fmt.Errorf("failed: %w", err)

Driver Compatibility

The library detects retryable errors using the SQLState() string method, which is implemented by:

Note for Developers

If you make any changes here (especially if they modify public APIs), please verify that the code in https://github.com/cockroachdb/examples-go still works and update as necessary.

Documentation

Overview

Package crdb provides helpers for using CockroachDB in client applications.

Index

Constants

View Source
const (
	// NoRetries is a sentinel value for LimitBackoffRetryPolicy.RetryLimit
	// indicating that no retries should be attempted. When a policy has
	// RetryLimit set to NoRetries, the transaction will be attempted only
	// once, and any retryable error will immediately return a
	// MaxRetriesExceededError.
	//
	// Use WithNoRetries(ctx) to create a context with this behavior.
	NoRetries = -1

	// UnlimitedRetries indicates that retries should continue indefinitely
	// until the transaction succeeds or a non-retryable error occurs. This
	// is represented by setting RetryLimit to 0.
	//
	// Use WithMaxRetries(ctx, 0) to create a context with unlimited retries,
	// though this is generally not recommended in production as it can lead
	// to infinite retry loops.
	UnlimitedRetries = 0
)

Variables

This section is empty.

Functions

func Execute

func Execute(fn func() error) (err error)

Execute runs fn and retries it as needed. It is used to add retry handling to the execution of a single statement. If a multi-statement transaction is being run, use ExecuteTx instead.

Retry handling for individual statements (implicit transactions) is usually performed automatically on the CockroachDB SQL gateway. As such, use of this function is generally not necessary. The exception to this rule is that automatic retries for individual statements are disabled once CockroachDB begins streaming results for the statements back to the client. By default, result streaming does not begin until the size of the result being produced for the client, including protocol overhead, exceeds 16KiB. As long as the results of a single statement or batch of statements are known to stay clear of this limit, the client does not need to worry about retries and should not need to use this function.

For more information about automatic transaction retries in CockroachDB, see https://cockroachlabs.com/docs/stable/transactions.html#automatic-retries.

NOTE: the supplied fn closure should not have external side effects beyond changes to the database.

fn must take care when wrapping errors returned from the database driver with additional context. For example, if the SELECT statement fails in the following snippet, the original retryable error will be masked by the call to fmt.Errorf, and the transaction will not be automatically retried.

crdb.Execute(func () error {
    rows, err := db.QueryContext(ctx, "SELECT ...")
    if err != nil {
        return fmt.Errorf("scanning row: %s", err)
    }
    defer rows.Close()
    for rows.Next() {
        // ...
    }
    if err := rows.Err(); err != nil {
        return fmt.Errorf("scanning row: %s", err)
    }
    return nil
})

Instead, add context by returning an error that implements either: - a `Cause() error` method, in the manner of github.com/pkg/errors, or - an `Unwrap() error` method, in the manner of the Go 1.13 standard library.

To achieve this, you can implement your own error type, or use `errors.Wrap()` from github.com/pkg/errors or github.com/cockroachdb/errors or a similar package, or use go 1.13's special `%w` formatter with fmt.Errorf(), for example fmt.Errorf("scanning row: %w", err).

import "github.com/pkg/errors"

crdb.Execute(func () error {
    rows, err := db.QueryContext(ctx, "SELECT ...")
    if err != nil {
        return errors.Wrap(err, "scanning row")
    }
    defer rows.Close()
    for rows.Next() {
        // ...
    }
    if err := rows.Err(); err != nil {
        return errors.Wrap(err, "scanning row")
    }
    return nil
})

func ExecuteCtx added in v2.4.0

func ExecuteCtx(ctx context.Context, fn ExecuteCtxFunc, args ...interface{}) (err error)

ExecuteCtx runs fn and retries it as needed, respecting a retry policy obtained from the context. It is used to add configurable retry handling to the execution of a single statement. If a multi-statement transaction is being run, use ExecuteTx instead.

The maximum number of retries can be configured using WithMaxRetries(ctx, n). Setting n=0 allows one attempt with no retries. If the number of retries is exhausted and the last attempt resulted in a retryable error, ExecuteCtx returns a max retries exceeded error wrapping the last retryable error encountered.

Arbitrary retry policies can be configured using WithRetryPolicy(ctx, p).

The fn parameter accepts variadic arguments which are passed through on each retry attempt, allowing for flexible parameterization of the retried operation.

As with Execute, retry handling for individual statements (implicit transactions) is usually performed automatically on the CockroachDB SQL gateway, making use of this function generally unnecessary. However, automatic retries are disabled once result streaming begins (typically when results exceed 16KiB).

NOTE: the supplied fn closure should not have external side effects beyond changes to the database.

fn must take care when wrapping errors returned from the database driver with additional context. To preserve retry behavior, errors should implement either `Cause() error` (github.com/pkg/errors) or `Unwrap() error` (Go 1.13+). For example:

crdb.ExecuteCtx(ctx, func(ctx context.Context, args ...interface{}) error {
    id := args[0].(int)
    rows, err := db.QueryContext(ctx, "SELECT * FROM users WHERE id = $1", id)
    if err != nil {
        return fmt.Errorf("scanning row: %w", err) // uses %w for proper error wrapping
    }
    defer rows.Close()
    // ...
    return nil
}, userID)

func ExecuteInTx

func ExecuteInTx(ctx context.Context, tx Tx, fn func() error) (err error)

ExecuteInTx runs fn inside tx. This method is primarily intended for internal use. See other packages for higher-level, framework-specific ExecuteTx() functions.

*WARNING*: It is assumed that no statements have been executed on the supplied Tx. ExecuteInTx will only retry statements that are performed within the supplied closure (fn). Any statements performed on the tx before ExecuteInTx is invoked will *not* be re-run if the transaction needs to be retried.

fn is subject to the same restrictions as the fn passed to ExecuteTx.

func ExecuteTx

func ExecuteTx(ctx context.Context, db *sql.DB, opts *sql.TxOptions, fn func(*sql.Tx) error) error

ExecuteTx runs fn inside a transaction and retries it as needed. On non-retryable failures, the transaction is aborted and rolled back; on success, the transaction is committed.

There are cases where the state of a transaction is inherently ambiguous: if we err on RELEASE with a communication error it's unclear if the transaction has been committed or not (similar to erroring on COMMIT in other databases). In that case, we return AmbiguousCommitError.

There are cases when restarting a transaction fails: we err on ROLLBACK to the SAVEPOINT. In that case, we return a TxnRestartError.

For more information about CockroachDB's transaction model, see https://cockroachlabs.com/docs/stable/transactions.html.

NOTE: the supplied fn closure should not have external side effects beyond changes to the database.

fn must take care when wrapping errors returned from the database driver with additional context. For example, if the UPDATE statement fails in the following snippet, the original retryable error will be masked by the call to fmt.Errorf, and the transaction will not be automatically retried.

crdb.ExecuteTx(ctx, db, txopts, func (tx *sql.Tx) error {
    if err := tx.ExecContext(ctx, "UPDATE..."); err != nil {
        return fmt.Errorf("updating record: %s", err)
    }
    return nil
})

Instead, add context by returning an error that implements either: - a `Cause() error` method, in the manner of github.com/pkg/errors, or - an `Unwrap() error` method, in the manner of the Go 1.13 standard library.

To achieve this, you can implement your own error type, or use `errors.Wrap()` from github.com/pkg/errors or github.com/cockroachdb/errors or a similar package, or use go 1.13's special `%w` formatter with fmt.Errorf(), for example fmt.Errorf("scanning row: %w", err).

import "github.com/pkg/errors"

crdb.ExecuteTx(ctx, db, txopts, func (tx *sql.Tx) error {
    if err := tx.ExecContext(ctx, "UPDATE..."); err != nil {
        return errors.Wrap(err, "updating record")
    }
    return nil
})

func ExecuteTxGenericTest

func ExecuteTxGenericTest(ctx context.Context, framework WriteSkewTest) error

ExecuteTxGenericTest represents the structure of a test for the ExecuteTx function. The actual database operations are abstracted by framework; the idea is that tests for different frameworks implement that interface and then invoke this test.

The test interleaves two transactions such that one of them will require a restart because of write skew.

func WithMaxRetries added in v2.2.8

func WithMaxRetries(ctx context.Context, retries int) context.Context

WithMaxRetries configures context so that ExecuteTx retries the transaction up to the specified number of times when encountering retryable errors.

The retries parameter controls retry behavior:

  • Positive value (e.g., 10): Retry up to that many times before failing
  • 0 (UnlimitedRetries): Retry indefinitely until success or non-retryable error (not recommended in production as it can lead to infinite retry loops)

This is a convenience function that creates a LimitBackoffRetryPolicy with no delay between retries (immediate retries).

Example with limited retries:

ctx := crdb.WithMaxRetries(context.Background(), 10)
err := crdb.ExecuteTx(ctx, db, nil, func(tx *sql.Tx) error {
    // Will retry up to 10 times on retryable errors
    return tx.ExecContext(ctx, "UPDATE ...")
})

Example with unlimited retries (use with caution):

ctx := crdb.WithMaxRetries(context.Background(), 0)
// Will retry indefinitely - ensure you have a context timeout!

To disable retries entirely, use WithNoRetries(ctx) instead.

func WithNoRetries added in v2.4.3

func WithNoRetries(ctx context.Context) context.Context

WithNoRetries configures context so that ExecuteTx will not retry on retryable errors. The transaction will be attempted exactly once.

This is useful when you want to handle retries manually or when operating in a context where automatic retries are not desired (e.g., in testing, or when implementing custom retry logic).

Example usage:

ctx := crdb.WithNoRetries(context.Background())
err := crdb.ExecuteTx(ctx, db, nil, func(tx *sql.Tx) error {
    // This will execute only once, no automatic retries
    return tx.ExecContext(ctx, "UPDATE ...")
})
if err != nil {
    // Handle error manually, potentially implementing custom retry logic
}

func WithRetryPolicy added in v2.4.3

func WithRetryPolicy(ctx context.Context, policy RetryPolicy) context.Context

WithRetryPolicy uses an arbitrary retry policy to perform retries.

Types

type AmbiguousCommitError

type AmbiguousCommitError struct {
	// contains filtered or unexported fields
}

AmbiguousCommitError represents an error that left a transaction in an ambiguous state: unclear if it committed or not.

func (*AmbiguousCommitError) Cause

func (e *AmbiguousCommitError) Cause() error

Cause implements the pkg/errors causer interface.

func (*AmbiguousCommitError) Error

func (e *AmbiguousCommitError) Error() string

Error implements the error interface.

func (*AmbiguousCommitError) Unwrap added in v2.0.2

func (e *AmbiguousCommitError) Unwrap() error

Unwrap implements the go error causer interface.

type ExecuteCtxFunc added in v2.4.0

type ExecuteCtxFunc func(context.Context, ...interface{}) error

ExecuteCtxFunc represents a function that takes a context and variadic arguments and returns an error. It's used with ExecuteCtx to enable retryable operations with configurable parameters.

type ExpBackoffRetryPolicy added in v2.4.3

type ExpBackoffRetryPolicy struct {
	// RetryLimit controls the retry behavior:
	//   - Positive value: Maximum number of retries before returning MaxRetriesExceededError
	//   - UnlimitedRetries (0): Retry indefinitely
	//   - NoRetries (-1) or any negative value: Do not retry, fail immediately
	RetryLimit int

	// BaseDelay is the initial delay before the first retry. Each subsequent
	// retry doubles this value: delay = BaseDelay * 2^(attempt-1).
	BaseDelay time.Duration

	// MaxDelay is the maximum delay cap. If > 0, delays are capped at this
	// value once reached. If 0, delays grow unbounded (until overflow, which
	// causes early termination).
	MaxDelay time.Duration
}

ExpBackoffRetryPolicy implements RetryPolicy using an exponential backoff strategy where delays double with each retry attempt, with an optional maximum delay cap.

The delay between retries doubles with each attempt, starting from BaseDelay:

  • Retry 1: BaseDelay
  • Retry 2: BaseDelay * 2
  • Retry 3: BaseDelay * 4
  • Retry N: BaseDelay * 2^(N-1)

If MaxDelay is set (> 0), the delay is capped at that value once reached. This prevents excessive wait times during high retry counts and provides a predictable upper bound for backoff duration.

The RetryLimit field controls retry behavior:

  • Positive value (e.g., 10): Retry up to that many times before failing
  • UnlimitedRetries (0): Retry indefinitely until success or non-retryable error
  • NoRetries (-1) or any negative value: Do not retry; fail immediately on first retryable error

When the limit is exceeded or if the delay calculation overflows without a MaxDelay set, it returns a MaxRetriesExceededError.

Example usage with capped exponential backoff:

policy := &ExpBackoffRetryPolicy{
    RetryLimit: 10,
    BaseDelay:  100 * time.Millisecond,
    MaxDelay:   5 * time.Second,
}
ctx := crdb.WithRetryPolicy(context.Background(), policy)
err := crdb.ExecuteTx(ctx, db, nil, func(tx *sql.Tx) error {
    // transaction logic that may encounter retryable errors
    return tx.ExecContext(ctx, "UPDATE ...")
})

This configuration produces delays: 100ms, 200ms, 400ms, 800ms, 1.6s, 3.2s, then stays at 5s for all subsequent retries.

Example usage with unbounded exponential backoff:

policy := &ExpBackoffRetryPolicy{
    RetryLimit: 5,
    BaseDelay:  1 * time.Second,
    MaxDelay:   0,  // no cap
}

This configuration produces delays: 1s, 2s, 4s, 8s, 16s. Note: Setting MaxDelay to 0 means no cap, but be aware that delay overflow will cause the policy to fail early.

func (*ExpBackoffRetryPolicy) NewRetry added in v2.4.3

func (l *ExpBackoffRetryPolicy) NewRetry() RetryFunc

NewRetry implements RetryPolicy.

type ExternalBackoff added in v2.4.3

type ExternalBackoff interface {
	// Next returns the next delay duration and whether to stop retrying.
	// When stop is true, no more retries will be attempted.
	Next() (next time.Duration, stop bool)
}

ExternalBackoff is an interface for external backoff strategies that provide delays through a Next() method. This allows adaptation of backoff policies from libraries like github.com/sethvargo/go-retry without creating a direct dependency.

Next returns the next backoff duration and a boolean indicating whether to stop retrying. When stop is true, the retry loop terminates with a MaxRetriesExceededError.

type LimitBackoffRetryPolicy added in v2.4.3

type LimitBackoffRetryPolicy struct {
	// RetryLimit controls the retry behavior:
	//   - Positive value: Maximum number of retries before returning MaxRetriesExceededError
	//   - UnlimitedRetries (0): Retry indefinitely
	//   - NoRetries (-1) or any negative value: Do not retry, fail immediately
	RetryLimit int

	// Delay is the fixed duration to wait between retry attempts. If 0,
	// retries happen immediately without delay.
	Delay time.Duration
}

LimitBackoffRetryPolicy implements RetryPolicy with a configurable retry limit and optional constant delay between retries.

The RetryLimit field controls retry behavior:

  • Positive value (e.g., 10): Retry up to that many times before failing
  • UnlimitedRetries (0): Retry indefinitely until success or non-retryable error
  • NoRetries (-1) or any negative value: Do not retry; fail immediately on first retryable error

If Delay is greater than zero, the policy will wait for the specified duration between retry attempts.

Example usage with limited retries and no delay:

policy := &LimitBackoffRetryPolicy{
    RetryLimit: 10,
    Delay:      0,
}
ctx := crdb.WithRetryPolicy(context.Background(), policy)
err := crdb.ExecuteTx(ctx, db, nil, func(tx *sql.Tx) error {
    // transaction logic
})

Example usage with fixed delay between retries:

policy := &LimitBackoffRetryPolicy{
    RetryLimit: 5,
    Delay:      100 * time.Millisecond,
}
ctx := crdb.WithRetryPolicy(context.Background(), policy)

Example usage with unlimited retries:

policy := &LimitBackoffRetryPolicy{
    RetryLimit: UnlimitedRetries,  // or 0
    Delay:      50 * time.Millisecond,
}

Note: Convenience functions are available:

  • WithMaxRetries(ctx, n) creates a LimitBackoffRetryPolicy with RetryLimit=n and Delay=0
  • WithNoRetries(ctx) creates a LimitBackoffRetryPolicy with RetryLimit=NoRetries

func (*LimitBackoffRetryPolicy) NewRetry added in v2.4.3

func (l *LimitBackoffRetryPolicy) NewRetry() RetryFunc

NewRetry implements RetryPolicy.

type MaxRetriesExceededError added in v2.0.8

type MaxRetriesExceededError struct {
	// contains filtered or unexported fields
}

MaxRetriesExceededError represents an error caused by retying the transaction too many times, without it ever succeeding.

func (*MaxRetriesExceededError) Cause added in v2.0.8

func (e *MaxRetriesExceededError) Cause() error

Cause implements the pkg/errors causer interface.

func (*MaxRetriesExceededError) Error added in v2.0.8

func (e *MaxRetriesExceededError) Error() string

Error implements the error interface.

func (*MaxRetriesExceededError) Unwrap added in v2.0.8

func (e *MaxRetriesExceededError) Unwrap() error

Unwrap implements the go error causer interface.

type RetryFunc added in v2.4.3

type RetryFunc func(err error) (time.Duration, error)

RetryFunc owns the state for a transaction retry operation. Usually, this is just the retry count. RetryFunc is not assumed to be safe for concurrent use.

The function is called after each retryable error to determine whether to retry and how long to wait. It receives the retryable error that triggered the retry attempt.

Return values:

  • duration: The delay to wait before the next retry attempt. If 0, retry immediately without delay.
  • error: If non-nil, stops retrying and returns this error to the caller (typically a MaxRetriesExceededError). If nil, the retry will proceed after the specified duration.

Example behavior:

  • (100ms, nil): Wait 100ms, then retry
  • (0, nil): Retry immediately
  • (0, err): Stop retrying, return err to caller

type RetryPolicy added in v2.4.3

type RetryPolicy interface {
	NewRetry() RetryFunc
}

RetryPolicy constructs a new instance of a RetryFunc for each transaction it is used with. Instances of RetryPolicy can likely be immutable and should be safe for concurrent calls to NewRetry.

func ExternalBackoffPolicy added in v2.4.3

func ExternalBackoffPolicy(fn func() ExternalBackoff) RetryPolicy

ExternalBackoffPolicy adapts third-party backoff strategies (like those from github.com/sethvargo/go-retry) into a RetryPolicy without creating a direct dependency on those libraries.

This function allows you to use any backoff implementation that conforms to the ExternalBackoff interface, providing flexibility to integrate external retry strategies with CockroachDB transaction retries.

Example usage with a hypothetical external backoff library:

import retry "github.com/sethvargo/go-retry"

// Create a retry policy using an external backoff strategy
policy := crdb.ExternalBackoffPolicy(func() crdb.ExternalBackoff {
    // Fibonacci backoff: 1s, 1s, 2s, 3s, 5s, 8s...
    return retry.NewFibonacci(1 * time.Second)
})
ctx := crdb.WithRetryPolicy(context.Background(), policy)
err := crdb.ExecuteTx(ctx, db, nil, func(tx *sql.Tx) error {
    // transaction logic
})

The function parameter should return a fresh ExternalBackoff instance for each transaction, as backoff state is not safe for concurrent use.

type Tx

type Tx interface {
	Exec(context.Context, string, ...interface{}) error
	Commit(context.Context) error
	Rollback(context.Context) error
}

Tx abstracts the operations needed by ExecuteInTx so that different frameworks (e.g. go's sql package, pgx, gorm) can be used with ExecuteInTx.

type TxnRestartError

type TxnRestartError struct {
	// contains filtered or unexported fields
}

TxnRestartError represents an error when restarting a transaction. `cause` is the error from restarting the txn and `retryCause` is the original error which triggered the restart.

func (*TxnRestartError) Cause

func (e *TxnRestartError) Cause() error

Cause implements the pkg/errors causer interface.

func (*TxnRestartError) Error

func (e *TxnRestartError) Error() string

Error implements the error interface.

func (*TxnRestartError) RetryCause

func (e *TxnRestartError) RetryCause() error

RetryCause returns the error that caused the transaction to be restarted.

func (*TxnRestartError) Unwrap added in v2.0.2

func (e *TxnRestartError) Unwrap() error

Unwrap implements the go error causer interface.

type WriteSkewTest

type WriteSkewTest interface {
	Init(context.Context) error
	ExecuteTx(ctx context.Context, fn func(tx interface{}) error) error
	GetBalances(ctx context.Context, tx interface{}) (bal1, bal2 int, err error)
	UpdateBalance(ctx context.Context, tx interface{}, acct, delta int) error
}

WriteSkewTest abstracts the operations that needs to be performed by a particular framework for the purposes of TestExecuteTx. This allows the test to be written once and run for any framework supported by this library.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL