db

package
v1.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2026 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultPageSize is the size for query results to request from DB
	DefaultPageSize = 20

	// MaxBatchItems limits the maximum number of items allowed in a single batch operation
	// to prevent performance degradation and potential timeouts from overly large batches.
	MaxBatchItems = 100

	// MaxBatchItemsToTrace limits the number of items traced in detail for batch operations
	// to avoid producing overly-large spans and reduce the risk of hitting tracing backend limits.
	// Items beyond this limit will still be processed but won't have their individual field values traced.
	MaxBatchItemsToTrace = 20
)
View Source
const (
	DefaultTxLockTimeoutSeconds = 300 // in seconds
)

Variables

View Source
var (
	// ErrDoesNotExist is raised a DB query fails to find the requested entity
	ErrDoesNotExist = errors.New("the requested entity does not exist")
	// ErrDBError is a generalized error to expose to the user when unexpected errors occur when communicating with DB
	ErrDBError = errors.New("error communicating with data store")
	// ErrInvalidValue is raised when a value to be stored in DB is invalid
	ErrInvalidValue = errors.New("provided value is invalid")
	// ErrInvalidParams is raised when a function is called with invalid set of parameters
	ErrInvalidParams = errors.New("provided params are invalid or conflicting")

	// ErrXactAdvisoryLockFailed indicates that the transaction advisory lock could not be taken
	ErrXactAdvisoryLockFailed = errors.New("unable to take transaction advisory lock")
	// ErrSessionAdvisoryLockFailed indicates that the session advisory lock could not be taken
	ErrSessionAdvisoryLockFailed = errors.New("unable to take session advisory lock")
	// ErrSessionAdvisoryLockUnlockFailed indicates that the session advisory lock could not be released.
	ErrSessionAdvisoryLockUnlockFailed = errors.New("unable to release session advisory lock or lock was not held by this session")

	// ErrInvalidPort indicates the DB_PORT environment variable is not a valid integer.
	ErrInvalidPort = errors.New("failed to parse DB_PORT")
	// ErrInvalidCredential indicates the credential is not valid.
	ErrInvalidCredential = errors.New("invalid credential")
)

Functions

func CompareStringSlicesIgnoreOrder

func CompareStringSlicesIgnoreOrder(a, b []string) bool

CompareStringSlicesIgnoreOrder compares two string slices ignoring order

func CurTime

func CurTime() time.Time

CurTime returns the current UTC time rounded to microseconds (useful for DB timestamps).

func GetAdvisoryLockIDFromString

func GetAdvisoryLockIDFromString(id string) uint64

GetAdvisoryLockIDFromString returns the advisory lock ID from a string pg expects lockid to not have the msb set

func GetBoolPtr

func GetBoolPtr(b bool) *bool

GetBoolPtr returns a pointer for the provided bool

func GetCurTime

func GetCurTime() time.Time

GetCurTime returns the current time

func GetIDB

func GetIDB(tx *Tx, dbSession *Session) bun.IDB

GetIDB is used by DAO methods to get the DB interface If DAO method's tx parameter is non-nil, return it else return the dbSession note: both bun.Tx and bun.DB implement the bun.IDB

func GetIntPtr

func GetIntPtr(i int) *int

GetIntPtr returns a pointer for the provided int

func GetStrPtr

func GetStrPtr(s string) *string

GetStrPtr returns a pointer for the provided string

func GetStringToTsQuery

func GetStringToTsQuery(inputQuery string) string

GetStringToTsQuery returns a string into a to_tsquery format from the input string

func GetStringToUint64Hash

func GetStringToUint64Hash(id string) uint64

GetStringToUint64Hash returns a uint64 hash of the input string this is used for advisory lock ids

func GetTimePtr

func GetTimePtr(t time.Time) *time.Time

GetTimePtr returns a pointer for the provided time

func GetUUIDPtr

func GetUUIDPtr(u uuid.UUID) *uuid.UUID

GetUUIDPtr returns a pointer for the provided UUID

func IsStrInSlice

func IsStrInSlice(s string, sl []string) bool

IsStrInSlice returns true if the provided string is in the provided slice

func RollbackTx

func RollbackTx(ctx context.Context, tx *Tx, committed *bool)

RollbackTx is called deferred in functions that create a transaction if transaction was committed, this will do nothing

Types

type Config

type Config struct {
	Host              string
	Port              int
	DBName            string
	Credential        credential.Credential
	CACertificatePath string
}

Config represents the configuration needed to connect to a database.

func ConfigFromEnv

func ConfigFromEnv() (Config, error)

ConfigFromEnv builds a Config from environment variables. Reads: DB_HOST, DB_PORT, DB_USER, DB_PASSWORD, DB_NAME, DB_CERT_PATH (optional CA certificate).

func (*Config) BuildDSN

func (c *Config) BuildDSN() string

BuildDSN builds the Data Source Name (DSN) string for connecting to the database.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks if the Config fields are set correctly.

type ErrorChecker

type ErrorChecker interface {
	IsErrNoRows(err error) bool
	IsUniqueConstraintError(err error) bool
}

ErrorChecker abstracts database error classification.

type LockRetryOptions

type LockRetryOptions struct {
	Retries *int
	Delay   *time.Duration
	Jitter  *time.Duration
}

type PostgresErrorChecker

type PostgresErrorChecker struct{}

PostgresErrorChecker classifies common Postgres errors such as no rows and unique constraint violations.

func (*PostgresErrorChecker) IsErrNoRows

func (checker *PostgresErrorChecker) IsErrNoRows(err error) bool

func (*PostgresErrorChecker) IsUniqueConstraintError

func (checker *PostgresErrorChecker) IsUniqueConstraintError(err error) bool

type Session

type Session struct {
	DBName string
	DB     *bun.DB
	// contains filtered or unexported fields
}

Session is a wrapper for an ORM DB object

func NewSession

func NewSession(ctx context.Context, host string, port int, dbName string, user string, password string, caCertPath string) (*Session, error)

NewSession creates and returns a new session object using pgx v5 + pgxpool. It delegates to NewSessionFromConfig to keep DSN logic centralized.

func NewSessionFromConfig

func NewSessionFromConfig(ctx context.Context, c Config) (*Session, error)

NewSessionFromConfig creates a Session from a Config.

func (*Session) BeginTx

func (s *Session) BeginTx(ctx context.Context) (bun.Tx, error)

BeginTx begins a new transaction with default options.

func (*Session) Close

func (s *Session) Close()

Close closes the session and the underlying connection pool.

func (*Session) GetErrorChecker

func (s *Session) GetErrorChecker() ErrorChecker

GetErrorChecker returns the error classifier for this session.

func (*Session) RunInTx

func (s *Session) RunInTx(
	ctx context.Context,
	fn func(ctx context.Context, tx bun.Tx) error,
) error

RunInTx executes a function within a transaction.

type Tx

type Tx struct {
	// contains filtered or unexported fields
}

Tx is a thin wrapper around the bun.Tx object

func BeginTx

func BeginTx(ctx context.Context, dbSession *Session, txOptions *sql.TxOptions) (*Tx, error)

BeginTx wraps bun's BeginTx

func (*Tx) AcquireAdvisoryLock

func (tx *Tx) AcquireAdvisoryLock(ctx context.Context, lockID uint64, blocking bool) error

AcquireAdvisoryLock will "try" to take the specified advisory lock on the transaction Error case: ----------- if the lock is already held by another transaction, this will error, and the caller needs to (possibly) retry in the same transaction (after a delay) this is the api-handler usecase or retry in a new transaction after rolling back the current transaction this is the workflow worker usecase Success case: ------------- the transaction lock when acquired is automatically released when the transaction commits or rollsback (or the transaction connection dies which is equivalent to a rollback for the transaction)

func (*Tx) Commit

func (tx *Tx) Commit() error

Commit wraps bun's Commit

func (*Tx) GetBunTx

func (tx *Tx) GetBunTx() *bun.Tx

GetBunTx gets the bun transaction object

func (*Tx) Rollback

func (tx *Tx) Rollback() error

Rollback wraps bun's Rollback

func (*Tx) TryAcquireAdvisoryLock

func (tx *Tx) TryAcquireAdvisoryLock(ctx context.Context, lockID uint64, options *LockRetryOptions) error

TryAcquireAdvisoryLock acquires an advisory lock retrying (up to retryCnt times which defaults to 3) when the lock acquisition attempt fails note, that each lock acquisition attempt is a non-blocking pg_try_advisory_xact_lock retries will backoff exponentially starting with initial delay of 300ms (300ms, 600ms, 1200ms etc..) with a max-jitter of 100ms.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL