db

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 27, 2025 License: MIT Imports: 19 Imported by: 0

Documentation

Overview

Package db provides DB access and advisory-lock helpers for coordinating HA workers. It includes two styles:

  • AdvisoryLocker: inherits the caller's transaction/connection.
  • TransactionScopedAdvisoryLocker: creates and owns its own transaction.

Package db implements the database connection and management.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ConnectToDB added in v0.4.0

func ConnectToDB(
	ctx context.Context,
	logger *zap.Logger,
	dsn string,
	namespace string,
	waitForDB time.Duration,
	statementTimeout time.Duration,
	prom *prometheus.Registry,
) (*sql.DB, error)

ConnectToDB establishes a connection to an existing database using the provided DSN. Unlike NewNamespacedDB, this function does not create the database or run migrations. If namespace is provided, it overrides the database name in the DSN.

func Get5MinutesOfCongestion added in v0.3.0

func Get5MinutesOfCongestion(
	ctx context.Context,
	querier *queries.Queries,
	originatorID, endMinute int32,
) (out [5]int32, err error)

Get5MinutesOfCongestion gets the congestion for the minute specified by `endMinute` and the previous 4 minutes returned in descending order with missing values filled with 0

func InsertGatewayEnvelopeAndIncrementUnsettledUsage added in v0.3.0

func InsertGatewayEnvelopeAndIncrementUnsettledUsage(
	ctx context.Context,
	db *sql.DB,
	insertParams queries.InsertGatewayEnvelopeParams,
	incrementParams queries.IncrementUnsettledUsageParams,
) (int64, error)

InsertGatewayEnvelopeAndIncrementUnsettledUsage inserts a gateway envelope and updates unsettled usage and congestion counters within a single database transaction.

This function runs inside a managed transaction created by RunInTxWithResult().

Steps:

  1. Calls InsertGatewayEnvelopeWithChecksTransactional() to insert the envelope, automatically creating any missing partitions if needed.
  2. If a new envelope is inserted, increments unsettled usage and congestion counters for the originator within the same transaction.
  3. If the envelope already exists (duplicate insert), metrics are not updated.

The function ensures atomicity between envelope insertion and usage updates. Safe for high-throughput ingest paths where message persistence and usage tracking must succeed or fail together.

func InsertGatewayEnvelopeWithChecksStandalone added in v1.0.0

func InsertGatewayEnvelopeWithChecksStandalone(
	ctx context.Context,
	q *queries.Queries,
	row queries.InsertGatewayEnvelopeParams,
) (queries.InsertGatewayEnvelopeRow, error)

InsertGatewayEnvelopeWithChecksStandalone inserts a gateway envelope in a non-transactional context, automatically creating missing partitions and retrying once.

Behavior:

  • Performs an insert into the v2 tables.
  • On “no partition of relation …” errors, creates the necessary partitions in the same connection, and retries the insert once.

This function does not use SAVEPOINTs and does not depend on an explicit transaction. It is ideal for standalone operations such as backfills, batch imports, or ingestion workers where each insert is independent of others.

func InsertGatewayEnvelopeWithChecksTransactional added in v1.0.0

func InsertGatewayEnvelopeWithChecksTransactional(
	ctx context.Context,
	q *queries.Queries,
	row queries.InsertGatewayEnvelopeParams,
) (queries.InsertGatewayEnvelopeRow, error)

InsertGatewayEnvelopeWithChecksTransactional attempts to insert a gateway envelope inside the current SQL transaction, with automatic partition creation and retry.

Behavior:

  • Creates a SAVEPOINT before the insert so that a failure does not abort the entire tx.
  • On “no partition of relation …” errors, it rolls back to the savepoint, creates the missing partition(s) using the same connection (within the tx), and retries the insert once.
  • On success, the savepoint is released.

This variant must be called within an active transaction. It avoids full tx rollbacks and ensures inserts can proceed even when new partitions are required. Use for transactional ingestion flows where atomicity must be preserved.

func NewNamespacedDB added in v0.1.2

func NewNamespacedDB(
	ctx context.Context,
	logger *zap.Logger,
	dsn string,
	namespace string,
	waitForDB time.Duration,
	statementTimeout time.Duration,
	prom *prometheus.Registry,
) (*sql.DB, error)

NewNamespacedDB creates a new database with the given namespace if it doesn't exist and returns the full DSN for the new database.

func NullInt32

func NullInt32(v int32) sql.NullInt32

func NullInt64

func NullInt64(v int64) sql.NullInt64

func RunInTx

func RunInTx(
	ctx context.Context,
	db *sql.DB,
	opts *sql.TxOptions,
	fn func(ctx context.Context, txQueries *queries.Queries) error,
) error

func RunInTxRaw added in v0.5.0

func RunInTxRaw(
	ctx context.Context,
	db *sql.DB,
	opts *sql.TxOptions,
	fn func(ctx context.Context, tx *sql.Tx) error,
) error

func RunInTxWithResult added in v0.3.0

func RunInTxWithResult[T any](
	ctx context.Context,
	db *sql.DB,
	opts *sql.TxOptions,
	fn func(ctx context.Context, txQueries *queries.Queries) (T, error),
) (result T, err error)

func TransformRowsByOriginator added in v1.0.0

func TransformRowsByOriginator(
	rows []queries.SelectGatewayEnvelopesByOriginatorsRow,
) []queries.GatewayEnvelopesView

func TransformRowsByTopic added in v1.0.0

Types

type AdvisoryLocker added in v1.0.0

type AdvisoryLocker struct{}

AdvisoryLocker builds advisory-lock keys and acquires locks using the caller’s connection/transaction via the provided *queries.Queries

func NewAdvisoryLocker added in v1.0.0

func NewAdvisoryLocker() *AdvisoryLocker

func (*AdvisoryLocker) LockAttestationWorker added in v1.0.0

func (a *AdvisoryLocker) LockAttestationWorker(
	ctx context.Context,
	queries *queries.Queries,
) error

func (*AdvisoryLocker) LockGeneratorWorker added in v1.0.0

func (a *AdvisoryLocker) LockGeneratorWorker(
	ctx context.Context,
	queries *queries.Queries,
) error

func (*AdvisoryLocker) LockIdentityUpdateInsert added in v1.0.0

func (a *AdvisoryLocker) LockIdentityUpdateInsert(
	ctx context.Context,
	queries *queries.Queries,
	nodeID uint32,
) error

func (*AdvisoryLocker) LockSettlementWorker added in v1.0.0

func (a *AdvisoryLocker) LockSettlementWorker(
	ctx context.Context,
	queries *queries.Queries,
) error

func (*AdvisoryLocker) LockSubmitterWorker added in v1.0.0

func (a *AdvisoryLocker) LockSubmitterWorker(
	ctx context.Context,
	queries *queries.Queries,
) error

type DBSubscription

type DBSubscription[ValueType any, CursorType any] struct {
	// contains filtered or unexported fields
}

DBSubscription is a subscription that polls a DB for updates Assumes there is only one listener (updates block on a single unbuffered channel)

func NewDBSubscription

func NewDBSubscription[ValueType any, CursorType any](
	ctx context.Context,
	logger *zap.Logger,
	query PollableDBQuery[ValueType, CursorType],
	lastSeen CursorType,
	options PollingOptions,
) *DBSubscription[ValueType, CursorType]

func (*DBSubscription[ValueType, CursorType]) Start

func (s *DBSubscription[ValueType, CursorType]) Start() (<-chan []ValueType, error)

type ITransactionScopedAdvisoryLocker added in v1.0.0

type ITransactionScopedAdvisoryLocker interface {
	Release() error
	LockGeneratorWorker() error
	LockAttestationWorker() error
	LockSubmitterWorker() error
	LockSettlementWorker() error
	LockIdentityUpdateInsert(nodeID uint32) error
}

type LockKind added in v1.0.0

type LockKind uint8

LockKind marks the lowest 8 bits of the advisory lock key.

const (
	LockKindIdentityUpdateInsert LockKind = 0x00
	LockKindAttestationWorker    LockKind = 0x01
	LockKindSubmitterWorker      LockKind = 0x02
	LockKindSettlementWorker     LockKind = 0x03
	LockKindGeneratorWorker      LockKind = 0x04
)

type PollableDBQuery

type PollableDBQuery[ValueType any, CursorType any] func(
	ctx context.Context,
	lastSeen CursorType,
	numRows int32,
) (results []ValueType, nextCursor CursorType, err error)

type PollingOptions

type PollingOptions struct {
	Interval time.Duration
	Notifier <-chan bool
	NumRows  int32
}

PollingOptions specifies the polling options for a DB subscription. It can poll whenever notified, or at an interval if not notified.

type Topic

type Topic = []byte

type TransactionScopedAdvisoryLocker added in v1.0.0

type TransactionScopedAdvisoryLocker struct {
	// contains filtered or unexported fields
}

TransactionScopedAdvisoryLocker creates and owns a transaction; methods acquire advisory locks within that tx. Release() rolls back the tx

func NewTransactionScopedAdvisoryLocker added in v1.0.0

func NewTransactionScopedAdvisoryLocker(
	ctx context.Context,
	db *sql.DB,
	opts *sql.TxOptions,
) (*TransactionScopedAdvisoryLocker, error)

func (*TransactionScopedAdvisoryLocker) LockAttestationWorker added in v1.0.0

func (a *TransactionScopedAdvisoryLocker) LockAttestationWorker() error

func (*TransactionScopedAdvisoryLocker) LockGeneratorWorker added in v1.0.0

func (a *TransactionScopedAdvisoryLocker) LockGeneratorWorker() error

func (*TransactionScopedAdvisoryLocker) LockIdentityUpdateInsert added in v1.0.0

func (a *TransactionScopedAdvisoryLocker) LockIdentityUpdateInsert(nodeID uint32) error

func (*TransactionScopedAdvisoryLocker) LockSettlementWorker added in v1.0.0

func (a *TransactionScopedAdvisoryLocker) LockSettlementWorker() error

func (*TransactionScopedAdvisoryLocker) LockSubmitterWorker added in v1.0.0

func (a *TransactionScopedAdvisoryLocker) LockSubmitterWorker() error

func (*TransactionScopedAdvisoryLocker) Release added in v1.0.0

type VectorClock

type VectorClock = map[uint32]uint64

func ToVectorClock

func ToVectorClock(rows []queries.GatewayEnvelopesLatest) VectorClock

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL