db

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2026 License: MIT Imports: 21 Imported by: 0

Documentation

Overview

Package db provides DB access and advisory-lock helpers for coordinating HA workers. It includes two styles:

  • AdvisoryLocker: inherits the caller's transaction/connection.
  • TransactionScopedAdvisoryLocker: creates and owns its own transaction.

Package db implements the database connection and management.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConnectToDB added in v0.4.0

func ConnectToDB(ctx context.Context,
	logger *zap.Logger,
	dsn string,
	namespace string,
	waitForDB time.Duration,
	statementTimeout time.Duration,
	prom *prometheus.Registry,
) (*sql.DB, error)

ConnectToDB establishes a connection to an existing database using the provided DSN. Unlike NewNamespacedDB, this function does not create the database or run migrations. If namespace is provided, it overrides the database name in the DSN.

func Get5MinutesOfCongestion added in v0.3.0

func Get5MinutesOfCongestion(
	ctx context.Context,
	querier *queries.Queries,
	originatorID, endMinute int32,
) (out [5]int32, err error)

Get5MinutesOfCongestion gets the congestion for the minute specified by `endMinute` and the previous 4 minutes returned in descending order with missing values filled with 0

func InsertGatewayEnvelopeAndIncrementUnsettledUsage added in v0.3.0

func InsertGatewayEnvelopeAndIncrementUnsettledUsage(
	ctx context.Context,
	db *sql.DB,
	insertParams queries.InsertGatewayEnvelopeParams,
	incrementParams queries.IncrementUnsettledUsageParams,
) (int64, error)

InsertGatewayEnvelopeAndIncrementUnsettledUsage inserts a gateway envelope and updates unsettled usage and congestion counters within a single database transaction.

This function runs inside a managed transaction created by RunInTxWithResult().

Steps:

  1. Calls InsertGatewayEnvelopeWithChecksTransactional() to insert the envelope, automatically creating any missing partitions if needed.
  2. If a new envelope is inserted, increments unsettled usage and congestion counters for the originator within the same transaction.
  3. If the envelope already exists (duplicate insert), metrics are not updated.

The function ensures atomicity between envelope insertion and usage updates. Safe for high-throughput ingest paths where message persistence and usage tracking must succeed or fail together.

func InsertGatewayEnvelopeWithChecksStandalone added in v1.0.0

func InsertGatewayEnvelopeWithChecksStandalone(
	ctx context.Context,
	q *queries.Queries,
	row queries.InsertGatewayEnvelopeParams,
) (queries.InsertGatewayEnvelopeRow, error)

InsertGatewayEnvelopeWithChecksStandalone inserts a gateway envelope in a non-transactional context, automatically creating missing partitions and retrying once.

Behavior:

  • Performs an insert into the v2 tables.
  • On “no partition of relation …” errors, creates the necessary partitions in the same connection, and retries the insert once.

This function does not use SAVEPOINTs and does not depend on an explicit transaction. It is ideal for standalone operations such as backfills, batch imports, or ingestion workers where each insert is independent of others.

func InsertGatewayEnvelopeWithChecksTransactional added in v1.0.0

func InsertGatewayEnvelopeWithChecksTransactional(
	ctx context.Context,
	q *queries.Queries,
	row queries.InsertGatewayEnvelopeParams,
) (queries.InsertGatewayEnvelopeRow, error)

InsertGatewayEnvelopeWithChecksTransactional attempts to insert a gateway envelope inside the current SQL transaction, with automatic partition creation and retry.

Behavior:

  • Creates a SAVEPOINT before the insert so that a failure does not abort the entire tx.
  • On “no partition of relation …” errors, it rolls back to the savepoint, creates the missing partition(s) using the same connection (within the tx), and retries the insert once.
  • On success, the savepoint is released.

This variant must be called within an active transaction. It avoids full tx rollbacks and ensures inserts can proceed even when new partitions are required. Use for transactional ingestion flows where atomicity must be preserved.

func NewNamespacedDB added in v0.1.2

func NewNamespacedDB(
	ctx context.Context,
	logger *zap.Logger,
	dsn string,
	namespace string,
	waitForDB time.Duration,
	statementTimeout time.Duration,
	prom *prometheus.Registry,
) (*sql.DB, error)

NewNamespacedDB creates a new database with the given namespace if it doesn't exist and returns the full DSN for the new database.

func NullInt32

func NullInt32(v int32) sql.NullInt32

func NullInt64

func NullInt64(v int64) sql.NullInt64

func RunInTx

func RunInTx(
	ctx context.Context,
	db *sql.DB,
	opts *sql.TxOptions,
	fn func(ctx context.Context, txQueries *queries.Queries) error,
) error

func RunInTxRaw added in v0.5.0

func RunInTxRaw(
	ctx context.Context,
	db *sql.DB,
	opts *sql.TxOptions,
	fn func(ctx context.Context, tx *sql.Tx) error,
) error

func RunInTxWithResult added in v0.3.0

func RunInTxWithResult[T any](
	ctx context.Context,
	db *sql.DB,
	opts *sql.TxOptions,
	fn func(ctx context.Context, txQueries *queries.Queries) (T, error),
) (result T, err error)

func TransformRowsByOriginator added in v1.0.0

func TransformRowsByOriginator(
	rows []queries.SelectGatewayEnvelopesByOriginatorsRow,
) []queries.GatewayEnvelopesView

func TransformRowsByTopic added in v1.0.0

Types

type AdvisoryLocker added in v1.0.0

type AdvisoryLocker struct{}

AdvisoryLocker builds advisory-lock keys and acquires locks using the caller’s connection/transaction via the provided *queries.Queries

func NewAdvisoryLocker added in v1.0.0

func NewAdvisoryLocker() *AdvisoryLocker

func (*AdvisoryLocker) LockIdentityUpdateInsert added in v1.0.0

func (a *AdvisoryLocker) LockIdentityUpdateInsert(
	ctx context.Context,
	queries *queries.Queries,
	nodeID uint32,
) error

func (*AdvisoryLocker) TryLockAttestationWorker added in v1.1.0

func (a *AdvisoryLocker) TryLockAttestationWorker(
	ctx context.Context,
	queries *queries.Queries,
) (bool, error)

func (*AdvisoryLocker) TryLockGeneratorWorker added in v1.1.0

func (a *AdvisoryLocker) TryLockGeneratorWorker(
	ctx context.Context,
	queries *queries.Queries,
) (bool, error)

func (*AdvisoryLocker) TryLockSettlementWorker added in v1.1.0

func (a *AdvisoryLocker) TryLockSettlementWorker(
	ctx context.Context,
	queries *queries.Queries,
) (bool, error)

func (*AdvisoryLocker) TryLockSubmitterWorker added in v1.1.0

func (a *AdvisoryLocker) TryLockSubmitterWorker(
	ctx context.Context,
	queries *queries.Queries,
) (bool, error)

type DBSubscription

type DBSubscription[ValueType any, CursorType any] struct {
	// contains filtered or unexported fields
}

DBSubscription is a subscription that polls a DB for updates Assumes there is only one listener (updates block on a single unbuffered channel)

func NewDBSubscription

func NewDBSubscription[ValueType any, CursorType any](
	ctx context.Context,
	logger *zap.Logger,
	query PollableDBQuery[ValueType, CursorType],
	lastSeen CursorType,
	options PollingOptions,
) *DBSubscription[ValueType, CursorType]

func (*DBSubscription[ValueType, CursorType]) Start

func (s *DBSubscription[ValueType, CursorType]) Start() (<-chan []ValueType, error)

type Handler added in v1.1.0

type Handler struct {
	// contains filtered or unexported fields
}

Handler eases working with two databases - a read-write and read-only database. It mitigates the possibility of a component attempting a write to DB, not knowing it received a handle to a read-only SQL DB. Handler also makes the query intent explicit. The handler will correctly route the request to the appropriate DB. It also eases the transition if some part of the code used to do read-only access and later needs to write data.

func NewDBHandler added in v1.1.0

func NewDBHandler(db *sql.DB, options ...HandlerOption) *Handler

NewDBHandler creates a new database handler with two database connections - a read-write and a read one. If there's no exclusive read replica it can be omitted and the write replica will be used.

func (*Handler) Close added in v1.1.0

func (h *Handler) Close() error

func (*Handler) DB added in v1.1.0

func (h *Handler) DB() *sql.DB

func (*Handler) Query added in v1.1.0

func (h *Handler) Query() *queries.Queries

func (*Handler) Read added in v1.1.0

func (h *Handler) Read() *sql.DB

func (*Handler) ReadQuery added in v1.1.0

func (h *Handler) ReadQuery() *queries.Queries

func (*Handler) Write added in v1.1.0

func (h *Handler) Write() *sql.DB

func (*Handler) WriteQuery added in v1.1.0

func (h *Handler) WriteQuery() *queries.Queries

type HandlerOption added in v1.1.0

type HandlerOption func(*handlerConfig)

func WithReadReplica added in v1.1.0

func WithReadReplica(db *sql.DB) HandlerOption

type ITransactionScopedAdvisoryLocker added in v1.0.0

type ITransactionScopedAdvisoryLocker interface {
	Release() error
	TryLockGeneratorWorker() (bool, error)
	TryLockAttestationWorker() (bool, error)
	TryLockSubmitterWorker() (bool, error)
	TryLockSettlementWorker() (bool, error)
	LockIdentityUpdateInsert(nodeID uint32) error
}

type LockKind added in v1.0.0

type LockKind uint8

LockKind marks the lowest 8 bits of the advisory lock key.

const (
	LockKindIdentityUpdateInsert LockKind = 0x00
	LockKindAttestationWorker    LockKind = 0x01
	LockKindSubmitterWorker      LockKind = 0x02
	LockKindSettlementWorker     LockKind = 0x03
	LockKindGeneratorWorker      LockKind = 0x04
)

type PollableDBQuery

type PollableDBQuery[ValueType any, CursorType any] func(
	ctx context.Context,
	lastSeen CursorType,
	numRows int32,
) (results []ValueType, nextCursor CursorType, err error)

type PollingOptions

type PollingOptions struct {
	Interval time.Duration
	Notifier <-chan bool
	NumRows  int32
}

PollingOptions specifies the polling options for a DB subscription. It can poll whenever notified, or at an interval if not notified.

type Topic

type Topic = []byte

type TransactionScopedAdvisoryLocker added in v1.0.0

type TransactionScopedAdvisoryLocker struct {
	// contains filtered or unexported fields
}

TransactionScopedAdvisoryLocker creates and owns a transaction; methods acquire advisory locks within that tx. Release() rolls back the tx

func NewTransactionScopedAdvisoryLocker added in v1.0.0

func NewTransactionScopedAdvisoryLocker(
	ctx context.Context,
	db *sql.DB,
	opts *sql.TxOptions,
) (*TransactionScopedAdvisoryLocker, error)

func (*TransactionScopedAdvisoryLocker) LockIdentityUpdateInsert added in v1.0.0

func (a *TransactionScopedAdvisoryLocker) LockIdentityUpdateInsert(nodeID uint32) error

func (*TransactionScopedAdvisoryLocker) Release added in v1.0.0

func (*TransactionScopedAdvisoryLocker) TryLockAttestationWorker added in v1.1.0

func (a *TransactionScopedAdvisoryLocker) TryLockAttestationWorker() (bool, error)

func (*TransactionScopedAdvisoryLocker) TryLockGeneratorWorker added in v1.1.0

func (a *TransactionScopedAdvisoryLocker) TryLockGeneratorWorker() (bool, error)

func (*TransactionScopedAdvisoryLocker) TryLockSettlementWorker added in v1.1.0

func (a *TransactionScopedAdvisoryLocker) TryLockSettlementWorker() (bool, error)

func (*TransactionScopedAdvisoryLocker) TryLockSubmitterWorker added in v1.1.0

func (a *TransactionScopedAdvisoryLocker) TryLockSubmitterWorker() (bool, error)

type VectorClock

type VectorClock = map[uint32]uint64

func ToVectorClock

func ToVectorClock(rows []queries.GatewayEnvelopesLatest) VectorClock

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL