Documentation
¶
Overview ¶
Package crdb provides helpers for using CockroachDB in client applications.
Index ¶
- Constants
- func Execute(fn func() error) (err error)
- func ExecuteCtx(ctx context.Context, fn ExecuteCtxFunc, args ...interface{}) (err error)
- func ExecuteInTx(ctx context.Context, tx Tx, fn func() error) (err error)
- func ExecuteTx(ctx context.Context, db *sql.DB, opts *sql.TxOptions, fn func(*sql.Tx) error) error
- func ExecuteTxGenericTest(ctx context.Context, framework WriteSkewTest) error
- func WithMaxRetries(ctx context.Context, retries int) context.Context
- func WithNoRetries(ctx context.Context) context.Context
- func WithRetryPolicy(ctx context.Context, policy RetryPolicy) context.Context
- type AmbiguousCommitError
- type ExecuteCtxFunc
- type ExpBackoffRetryPolicy
- type ExternalBackoff
- type LimitBackoffRetryPolicy
- type MaxRetriesExceededError
- type RetryFunc
- type RetryPolicy
- type Tx
- type TxnRestartError
- type WriteSkewTest
Constants ¶
const ( // NoRetries is a sentinel value for LimitBackoffRetryPolicy.RetryLimit // indicating that no retries should be attempted. When a policy has // RetryLimit set to NoRetries, the transaction will be attempted only // once, and any retryable error will immediately return a // MaxRetriesExceededError. // // Use WithNoRetries(ctx) to create a context with this behavior. NoRetries = -1 // UnlimitedRetries indicates that retries should continue indefinitely // until the transaction succeeds or a non-retryable error occurs. This // is represented by setting RetryLimit to 0. // // Use WithMaxRetries(ctx, 0) to create a context with unlimited retries, // though this is generally not recommended in production as it can lead // to infinite retry loops. UnlimitedRetries = 0 )
Variables ¶
This section is empty.
Functions ¶
func Execute ¶
Execute runs fn and retries it as needed. It is used to add retry handling to the execution of a single statement. If a multi-statement transaction is being run, use ExecuteTx instead.
Retry handling for individual statements (implicit transactions) is usually performed automatically on the CockroachDB SQL gateway. As such, use of this function is generally not necessary. The exception to this rule is that automatic retries for individual statements are disabled once CockroachDB begins streaming results for the statements back to the client. By default, result streaming does not begin until the size of the result being produced for the client, including protocol overhead, exceeds 16KiB. As long as the results of a single statement or batch of statements are known to stay clear of this limit, the client does not need to worry about retries and should not need to use this function.
For more information about automatic transaction retries in CockroachDB, see https://cockroachlabs.com/docs/stable/transactions.html#automatic-retries.
NOTE: the supplied fn closure should not have external side effects beyond changes to the database.
fn must take care when wrapping errors returned from the database driver with additional context. For example, if the SELECT statement fails in the following snippet, the original retryable error will be masked by the call to fmt.Errorf, and the transaction will not be automatically retried.
crdb.Execute(func () error {
rows, err := db.QueryContext(ctx, "SELECT ...")
if err != nil {
return fmt.Errorf("scanning row: %s", err)
}
defer rows.Close()
for rows.Next() {
// ...
}
if err := rows.Err(); err != nil {
return fmt.Errorf("scanning row: %s", err)
}
return nil
})
Instead, add context by returning an error that implements either: - a `Cause() error` method, in the manner of github.com/pkg/errors, or - an `Unwrap() error` method, in the manner of the Go 1.13 standard library.
To achieve this, you can implement your own error type, or use `errors.Wrap()` from github.com/pkg/errors or github.com/cockroachdb/errors or a similar package, or use go 1.13's special `%w` formatter with fmt.Errorf(), for example fmt.Errorf("scanning row: %w", err).
import "github.com/pkg/errors"
crdb.Execute(func () error {
rows, err := db.QueryContext(ctx, "SELECT ...")
if err != nil {
return errors.Wrap(err, "scanning row")
}
defer rows.Close()
for rows.Next() {
// ...
}
if err := rows.Err(); err != nil {
return errors.Wrap(err, "scanning row")
}
return nil
})
func ExecuteCtx ¶ added in v2.4.0
func ExecuteCtx(ctx context.Context, fn ExecuteCtxFunc, args ...interface{}) (err error)
ExecuteCtx runs fn and retries it as needed, respecting a retry policy obtained from the context. It is used to add configurable retry handling to the execution of a single statement. If a multi-statement transaction is being run, use ExecuteTx instead.
The maximum number of retries can be configured using WithMaxRetries(ctx, n). Setting n=0 allows one attempt with no retries. If the number of retries is exhausted and the last attempt resulted in a retryable error, ExecuteCtx returns a max retries exceeded error wrapping the last retryable error encountered.
Arbitrary retry policies can be configured using WithRetryPolicy(ctx, p).
The fn parameter accepts variadic arguments which are passed through on each retry attempt, allowing for flexible parameterization of the retried operation.
As with Execute, retry handling for individual statements (implicit transactions) is usually performed automatically on the CockroachDB SQL gateway, making use of this function generally unnecessary. However, automatic retries are disabled once result streaming begins (typically when results exceed 16KiB).
NOTE: the supplied fn closure should not have external side effects beyond changes to the database.
fn must take care when wrapping errors returned from the database driver with additional context. To preserve retry behavior, errors should implement either `Cause() error` (github.com/pkg/errors) or `Unwrap() error` (Go 1.13+). For example:
crdb.ExecuteCtx(ctx, func(ctx context.Context, args ...interface{}) error {
id := args[0].(int)
rows, err := db.QueryContext(ctx, "SELECT * FROM users WHERE id = $1", id)
if err != nil {
return fmt.Errorf("scanning row: %w", err) // uses %w for proper error wrapping
}
defer rows.Close()
// ...
return nil
}, userID)
func ExecuteInTx ¶
ExecuteInTx runs fn inside tx. This method is primarily intended for internal use. See other packages for higher-level, framework-specific ExecuteTx() functions.
*WARNING*: It is assumed that no statements have been executed on the supplied Tx. ExecuteInTx will only retry statements that are performed within the supplied closure (fn). Any statements performed on the tx before ExecuteInTx is invoked will *not* be re-run if the transaction needs to be retried.
fn is subject to the same restrictions as the fn passed to ExecuteTx.
func ExecuteTx ¶
ExecuteTx runs fn inside a transaction and retries it as needed. On non-retryable failures, the transaction is aborted and rolled back; on success, the transaction is committed.
There are cases where the state of a transaction is inherently ambiguous: if we err on RELEASE with a communication error it's unclear if the transaction has been committed or not (similar to erroring on COMMIT in other databases). In that case, we return AmbiguousCommitError.
There are cases when restarting a transaction fails: we err on ROLLBACK to the SAVEPOINT. In that case, we return a TxnRestartError.
For more information about CockroachDB's transaction model, see https://cockroachlabs.com/docs/stable/transactions.html.
NOTE: the supplied fn closure should not have external side effects beyond changes to the database.
fn must take care when wrapping errors returned from the database driver with additional context. For example, if the UPDATE statement fails in the following snippet, the original retryable error will be masked by the call to fmt.Errorf, and the transaction will not be automatically retried.
crdb.ExecuteTx(ctx, db, txopts, func (tx *sql.Tx) error {
if err := tx.ExecContext(ctx, "UPDATE..."); err != nil {
return fmt.Errorf("updating record: %s", err)
}
return nil
})
Instead, add context by returning an error that implements either: - a `Cause() error` method, in the manner of github.com/pkg/errors, or - an `Unwrap() error` method, in the manner of the Go 1.13 standard library.
To achieve this, you can implement your own error type, or use `errors.Wrap()` from github.com/pkg/errors or github.com/cockroachdb/errors or a similar package, or use go 1.13's special `%w` formatter with fmt.Errorf(), for example fmt.Errorf("scanning row: %w", err).
import "github.com/pkg/errors"
crdb.ExecuteTx(ctx, db, txopts, func (tx *sql.Tx) error {
if err := tx.ExecContext(ctx, "UPDATE..."); err != nil {
return errors.Wrap(err, "updating record")
}
return nil
})
func ExecuteTxGenericTest ¶
func ExecuteTxGenericTest(ctx context.Context, framework WriteSkewTest) error
ExecuteTxGenericTest represents the structure of a test for the ExecuteTx function. The actual database operations are abstracted by framework; the idea is that tests for different frameworks implement that interface and then invoke this test.
The test interleaves two transactions such that one of them will require a restart because of write skew.
func WithMaxRetries ¶ added in v2.2.8
WithMaxRetries configures context so that ExecuteTx retries the transaction up to the specified number of times when encountering retryable errors.
The retries parameter controls retry behavior:
- Positive value (e.g., 10): Retry up to that many times before failing
- 0 (UnlimitedRetries): Retry indefinitely until success or non-retryable error (not recommended in production as it can lead to infinite retry loops)
This is a convenience function that creates a LimitBackoffRetryPolicy with no delay between retries (immediate retries).
Example with limited retries:
ctx := crdb.WithMaxRetries(context.Background(), 10)
err := crdb.ExecuteTx(ctx, db, nil, func(tx *sql.Tx) error {
// Will retry up to 10 times on retryable errors
return tx.ExecContext(ctx, "UPDATE ...")
})
Example with unlimited retries (use with caution):
ctx := crdb.WithMaxRetries(context.Background(), 0) // Will retry indefinitely - ensure you have a context timeout!
To disable retries entirely, use WithNoRetries(ctx) instead.
func WithNoRetries ¶ added in v2.4.3
WithNoRetries configures context so that ExecuteTx will not retry on retryable errors. The transaction will be attempted exactly once.
This is useful when you want to handle retries manually or when operating in a context where automatic retries are not desired (e.g., in testing, or when implementing custom retry logic).
Example usage:
ctx := crdb.WithNoRetries(context.Background())
err := crdb.ExecuteTx(ctx, db, nil, func(tx *sql.Tx) error {
// This will execute only once, no automatic retries
return tx.ExecContext(ctx, "UPDATE ...")
})
if err != nil {
// Handle error manually, potentially implementing custom retry logic
}
func WithRetryPolicy ¶ added in v2.4.3
func WithRetryPolicy(ctx context.Context, policy RetryPolicy) context.Context
WithRetryPolicy uses an arbitrary retry policy to perform retries.
Types ¶
type AmbiguousCommitError ¶
type AmbiguousCommitError struct {
// contains filtered or unexported fields
}
AmbiguousCommitError represents an error that left a transaction in an ambiguous state: unclear if it committed or not.
func (*AmbiguousCommitError) Cause ¶
func (e *AmbiguousCommitError) Cause() error
Cause implements the pkg/errors causer interface.
type ExecuteCtxFunc ¶ added in v2.4.0
ExecuteCtxFunc represents a function that takes a context and variadic arguments and returns an error. It's used with ExecuteCtx to enable retryable operations with configurable parameters.
type ExpBackoffRetryPolicy ¶ added in v2.4.3
type ExpBackoffRetryPolicy struct {
// RetryLimit controls the retry behavior:
// - Positive value: Maximum number of retries before returning MaxRetriesExceededError
// - UnlimitedRetries (0): Retry indefinitely
// - NoRetries (-1) or any negative value: Do not retry, fail immediately
RetryLimit int
// BaseDelay is the initial delay before the first retry. Each subsequent
// retry doubles this value: delay = BaseDelay * 2^(attempt-1).
BaseDelay time.Duration
// MaxDelay is the maximum delay cap. If > 0, delays are capped at this
// value once reached. If 0, delays grow unbounded (until overflow, which
// causes early termination).
MaxDelay time.Duration
}
ExpBackoffRetryPolicy implements RetryPolicy using an exponential backoff strategy where delays double with each retry attempt, with an optional maximum delay cap.
The delay between retries doubles with each attempt, starting from BaseDelay:
- Retry 1: BaseDelay
- Retry 2: BaseDelay * 2
- Retry 3: BaseDelay * 4
- Retry N: BaseDelay * 2^(N-1)
If MaxDelay is set (> 0), the delay is capped at that value once reached. This prevents excessive wait times during high retry counts and provides a predictable upper bound for backoff duration.
The RetryLimit field controls retry behavior:
- Positive value (e.g., 10): Retry up to that many times before failing
- UnlimitedRetries (0): Retry indefinitely until success or non-retryable error
- NoRetries (-1) or any negative value: Do not retry; fail immediately on first retryable error
When the limit is exceeded or if the delay calculation overflows without a MaxDelay set, it returns a MaxRetriesExceededError.
Example usage with capped exponential backoff:
policy := &ExpBackoffRetryPolicy{
RetryLimit: 10,
BaseDelay: 100 * time.Millisecond,
MaxDelay: 5 * time.Second,
}
ctx := crdb.WithRetryPolicy(context.Background(), policy)
err := crdb.ExecuteTx(ctx, db, nil, func(tx *sql.Tx) error {
// transaction logic that may encounter retryable errors
return tx.ExecContext(ctx, "UPDATE ...")
})
This configuration produces delays: 100ms, 200ms, 400ms, 800ms, 1.6s, 3.2s, then stays at 5s for all subsequent retries.
Example usage with unbounded exponential backoff:
policy := &ExpBackoffRetryPolicy{
RetryLimit: 5,
BaseDelay: 1 * time.Second,
MaxDelay: 0, // no cap
}
This configuration produces delays: 1s, 2s, 4s, 8s, 16s. Note: Setting MaxDelay to 0 means no cap, but be aware that delay overflow will cause the policy to fail early.
func (*ExpBackoffRetryPolicy) NewRetry ¶ added in v2.4.3
func (l *ExpBackoffRetryPolicy) NewRetry() RetryFunc
NewRetry implements RetryPolicy.
type ExternalBackoff ¶ added in v2.4.3
type ExternalBackoff interface {
// Next returns the next delay duration and whether to stop retrying.
// When stop is true, no more retries will be attempted.
Next() (next time.Duration, stop bool)
}
ExternalBackoff is an interface for external backoff strategies that provide delays through a Next() method. This allows adaptation of backoff policies from libraries like github.com/sethvargo/go-retry without creating a direct dependency.
Next returns the next backoff duration and a boolean indicating whether to stop retrying. When stop is true, the retry loop terminates with a MaxRetriesExceededError.
type LimitBackoffRetryPolicy ¶ added in v2.4.3
type LimitBackoffRetryPolicy struct {
// RetryLimit controls the retry behavior:
// - Positive value: Maximum number of retries before returning MaxRetriesExceededError
// - UnlimitedRetries (0): Retry indefinitely
// - NoRetries (-1) or any negative value: Do not retry, fail immediately
RetryLimit int
// Delay is the fixed duration to wait between retry attempts. If 0,
// retries happen immediately without delay.
Delay time.Duration
}
LimitBackoffRetryPolicy implements RetryPolicy with a configurable retry limit and optional constant delay between retries.
The RetryLimit field controls retry behavior:
- Positive value (e.g., 10): Retry up to that many times before failing
- UnlimitedRetries (0): Retry indefinitely until success or non-retryable error
- NoRetries (-1) or any negative value: Do not retry; fail immediately on first retryable error
If Delay is greater than zero, the policy will wait for the specified duration between retry attempts.
Example usage with limited retries and no delay:
policy := &LimitBackoffRetryPolicy{
RetryLimit: 10,
Delay: 0,
}
ctx := crdb.WithRetryPolicy(context.Background(), policy)
err := crdb.ExecuteTx(ctx, db, nil, func(tx *sql.Tx) error {
// transaction logic
})
Example usage with fixed delay between retries:
policy := &LimitBackoffRetryPolicy{
RetryLimit: 5,
Delay: 100 * time.Millisecond,
}
ctx := crdb.WithRetryPolicy(context.Background(), policy)
Example usage with unlimited retries:
policy := &LimitBackoffRetryPolicy{
RetryLimit: UnlimitedRetries, // or 0
Delay: 50 * time.Millisecond,
}
Note: Convenience functions are available:
- WithMaxRetries(ctx, n) creates a LimitBackoffRetryPolicy with RetryLimit=n and Delay=0
- WithNoRetries(ctx) creates a LimitBackoffRetryPolicy with RetryLimit=NoRetries
func (*LimitBackoffRetryPolicy) NewRetry ¶ added in v2.4.3
func (l *LimitBackoffRetryPolicy) NewRetry() RetryFunc
NewRetry implements RetryPolicy.
type MaxRetriesExceededError ¶ added in v2.0.8
type MaxRetriesExceededError struct {
// contains filtered or unexported fields
}
MaxRetriesExceededError represents an error caused by retying the transaction too many times, without it ever succeeding.
func (*MaxRetriesExceededError) Cause ¶ added in v2.0.8
func (e *MaxRetriesExceededError) Cause() error
Cause implements the pkg/errors causer interface.
func (*MaxRetriesExceededError) Error ¶ added in v2.0.8
func (e *MaxRetriesExceededError) Error() string
Error implements the error interface.
type RetryFunc ¶ added in v2.4.3
RetryFunc owns the state for a transaction retry operation. Usually, this is just the retry count. RetryFunc is not assumed to be safe for concurrent use.
The function is called after each retryable error to determine whether to retry and how long to wait. It receives the retryable error that triggered the retry attempt.
Return values:
- duration: The delay to wait before the next retry attempt. If 0, retry immediately without delay.
- error: If non-nil, stops retrying and returns this error to the caller (typically a MaxRetriesExceededError). If nil, the retry will proceed after the specified duration.
Example behavior:
- (100ms, nil): Wait 100ms, then retry
- (0, nil): Retry immediately
- (0, err): Stop retrying, return err to caller
type RetryPolicy ¶ added in v2.4.3
type RetryPolicy interface {
NewRetry() RetryFunc
}
RetryPolicy constructs a new instance of a RetryFunc for each transaction it is used with. Instances of RetryPolicy can likely be immutable and should be safe for concurrent calls to NewRetry.
func ExternalBackoffPolicy ¶ added in v2.4.3
func ExternalBackoffPolicy(fn func() ExternalBackoff) RetryPolicy
ExternalBackoffPolicy adapts third-party backoff strategies (like those from github.com/sethvargo/go-retry) into a RetryPolicy without creating a direct dependency on those libraries.
This function allows you to use any backoff implementation that conforms to the ExternalBackoff interface, providing flexibility to integrate external retry strategies with CockroachDB transaction retries.
Example usage with a hypothetical external backoff library:
import retry "github.com/sethvargo/go-retry"
// Create a retry policy using an external backoff strategy
policy := crdb.ExternalBackoffPolicy(func() crdb.ExternalBackoff {
// Fibonacci backoff: 1s, 1s, 2s, 3s, 5s, 8s...
return retry.NewFibonacci(1 * time.Second)
})
ctx := crdb.WithRetryPolicy(context.Background(), policy)
err := crdb.ExecuteTx(ctx, db, nil, func(tx *sql.Tx) error {
// transaction logic
})
The function parameter should return a fresh ExternalBackoff instance for each transaction, as backoff state is not safe for concurrent use.
type Tx ¶
type Tx interface {
Exec(context.Context, string, ...interface{}) error
Commit(context.Context) error
Rollback(context.Context) error
}
Tx abstracts the operations needed by ExecuteInTx so that different frameworks (e.g. go's sql package, pgx, gorm) can be used with ExecuteInTx.
type TxnRestartError ¶
type TxnRestartError struct {
// contains filtered or unexported fields
}
TxnRestartError represents an error when restarting a transaction. `cause` is the error from restarting the txn and `retryCause` is the original error which triggered the restart.
func (*TxnRestartError) Cause ¶
func (e *TxnRestartError) Cause() error
Cause implements the pkg/errors causer interface.
func (*TxnRestartError) Error ¶
func (e *TxnRestartError) Error() string
Error implements the error interface.
func (*TxnRestartError) RetryCause ¶
func (e *TxnRestartError) RetryCause() error
RetryCause returns the error that caused the transaction to be restarted.
type WriteSkewTest ¶
type WriteSkewTest interface {
Init(context.Context) error
ExecuteTx(ctx context.Context, fn func(tx interface{}) error) error
GetBalances(ctx context.Context, tx interface{}) (bal1, bal2 int, err error)
UpdateBalance(ctx context.Context, tx interface{}, acct, delta int) error
}
WriteSkewTest abstracts the operations that needs to be performed by a particular framework for the purposes of TestExecuteTx. This allows the test to be written once and run for any framework supported by this library.