Documentation
¶
Overview ¶
Package db provides DB access and advisory-lock helpers for coordinating HA workers. It includes two styles:
- AdvisoryLocker: inherits the caller's transaction/connection.
- TransactionScopedAdvisoryLocker: creates and owns its own transaction.
Package db implements the database connection and management.
Index ¶
- func ConnectToDB(ctx context.Context, logger *zap.Logger, dsn string, namespace string, ...) (*sql.DB, error)
- func Get5MinutesOfCongestion(ctx context.Context, querier *queries.Queries, originatorID, endMinute int32) (out [5]int32, err error)
- func InsertGatewayEnvelopeAndIncrementUnsettledUsage(ctx context.Context, db *sql.DB, ...) (int64, error)
- func InsertGatewayEnvelopeWithChecksStandalone(ctx context.Context, q *queries.Queries, ...) (queries.InsertGatewayEnvelopeRow, error)
- func InsertGatewayEnvelopeWithChecksTransactional(ctx context.Context, q *queries.Queries, ...) (queries.InsertGatewayEnvelopeRow, error)
- func NewNamespacedDB(ctx context.Context, logger *zap.Logger, dsn string, namespace string, ...) (*sql.DB, error)
- func NullInt32(v int32) sql.NullInt32
- func NullInt64(v int64) sql.NullInt64
- func RunInTx(ctx context.Context, db *sql.DB, opts *sql.TxOptions, ...) error
- func RunInTxRaw(ctx context.Context, db *sql.DB, opts *sql.TxOptions, ...) error
- func RunInTxWithResult[T any](ctx context.Context, db *sql.DB, opts *sql.TxOptions, ...) (result T, err error)
- func SetVectorClockByOriginators(q *queries.SelectGatewayEnvelopesByOriginatorsParams, vc VectorClock) *queries.SelectGatewayEnvelopesByOriginatorsParams
- func SetVectorClockByTopics(q *queries.SelectGatewayEnvelopesByTopicsParams, vc VectorClock) *queries.SelectGatewayEnvelopesByTopicsParams
- func SetVectorClockUnfiltered(q *queries.SelectGatewayEnvelopesUnfilteredParams, vc VectorClock) *queries.SelectGatewayEnvelopesUnfilteredParams
- func TransformRowsByOriginator(rows []queries.SelectGatewayEnvelopesByOriginatorsRow) []queries.GatewayEnvelopesView
- func TransformRowsByTopic(rows []queries.SelectGatewayEnvelopesByTopicsRow) []queries.GatewayEnvelopesView
- type AdvisoryLocker
- func (a *AdvisoryLocker) LockIdentityUpdateInsert(ctx context.Context, queries *queries.Queries, nodeID uint32) error
- func (a *AdvisoryLocker) TryLockAttestationWorker(ctx context.Context, queries *queries.Queries) (bool, error)
- func (a *AdvisoryLocker) TryLockGeneratorWorker(ctx context.Context, queries *queries.Queries) (bool, error)
- func (a *AdvisoryLocker) TryLockSettlementWorker(ctx context.Context, queries *queries.Queries) (bool, error)
- func (a *AdvisoryLocker) TryLockSubmitterWorker(ctx context.Context, queries *queries.Queries) (bool, error)
- type DBSubscription
- type Handler
- type HandlerOption
- type ITransactionScopedAdvisoryLocker
- type LockKind
- type PollableDBQuery
- type PollingOptions
- type Topic
- type TransactionScopedAdvisoryLocker
- func (a *TransactionScopedAdvisoryLocker) LockIdentityUpdateInsert(nodeID uint32) error
- func (a *TransactionScopedAdvisoryLocker) Release() error
- func (a *TransactionScopedAdvisoryLocker) TryLockAttestationWorker() (bool, error)
- func (a *TransactionScopedAdvisoryLocker) TryLockGeneratorWorker() (bool, error)
- func (a *TransactionScopedAdvisoryLocker) TryLockSettlementWorker() (bool, error)
- func (a *TransactionScopedAdvisoryLocker) TryLockSubmitterWorker() (bool, error)
- type VectorClock
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ConnectToDB ¶ added in v0.4.0
func ConnectToDB(ctx context.Context, logger *zap.Logger, dsn string, namespace string, waitForDB time.Duration, statementTimeout time.Duration, prom *prometheus.Registry, ) (*sql.DB, error)
ConnectToDB establishes a connection to an existing database using the provided DSN. Unlike NewNamespacedDB, this function does not create the database or run migrations. If namespace is provided, it overrides the database name in the DSN.
func Get5MinutesOfCongestion ¶ added in v0.3.0
func Get5MinutesOfCongestion( ctx context.Context, querier *queries.Queries, originatorID, endMinute int32, ) (out [5]int32, err error)
Get5MinutesOfCongestion gets the congestion for the minute specified by `endMinute` and the previous 4 minutes returned in descending order with missing values filled with 0
func InsertGatewayEnvelopeAndIncrementUnsettledUsage ¶ added in v0.3.0
func InsertGatewayEnvelopeAndIncrementUnsettledUsage( ctx context.Context, db *sql.DB, insertParams queries.InsertGatewayEnvelopeParams, incrementParams queries.IncrementUnsettledUsageParams, ) (int64, error)
InsertGatewayEnvelopeAndIncrementUnsettledUsage inserts a gateway envelope and updates unsettled usage and congestion counters within a single database transaction.
This function runs inside a managed transaction created by RunInTxWithResult().
Steps:
- Calls InsertGatewayEnvelopeWithChecksTransactional() to insert the envelope, automatically creating any missing partitions if needed.
- If a new envelope is inserted, increments unsettled usage and congestion counters for the originator within the same transaction.
- If the envelope already exists (duplicate insert), metrics are not updated.
The function ensures atomicity between envelope insertion and usage updates. Safe for high-throughput ingest paths where message persistence and usage tracking must succeed or fail together.
func InsertGatewayEnvelopeWithChecksStandalone ¶ added in v1.0.0
func InsertGatewayEnvelopeWithChecksStandalone( ctx context.Context, q *queries.Queries, row queries.InsertGatewayEnvelopeParams, ) (queries.InsertGatewayEnvelopeRow, error)
InsertGatewayEnvelopeWithChecksStandalone inserts a gateway envelope in a non-transactional context, automatically creating missing partitions and retrying once.
Behavior:
- Performs an insert into the v2 tables.
- On “no partition of relation …” errors, creates the necessary partitions in the same connection, and retries the insert once.
This function does not use SAVEPOINTs and does not depend on an explicit transaction. It is ideal for standalone operations such as backfills, batch imports, or ingestion workers where each insert is independent of others.
func InsertGatewayEnvelopeWithChecksTransactional ¶ added in v1.0.0
func InsertGatewayEnvelopeWithChecksTransactional( ctx context.Context, q *queries.Queries, row queries.InsertGatewayEnvelopeParams, ) (queries.InsertGatewayEnvelopeRow, error)
InsertGatewayEnvelopeWithChecksTransactional attempts to insert a gateway envelope inside the current SQL transaction, with automatic partition creation and retry.
Behavior:
- Creates a SAVEPOINT before the insert so that a failure does not abort the entire tx.
- On “no partition of relation …” errors, it rolls back to the savepoint, creates the missing partition(s) using the same connection (within the tx), and retries the insert once.
- On success, the savepoint is released.
This variant must be called within an active transaction. It avoids full tx rollbacks and ensures inserts can proceed even when new partitions are required. Use for transactional ingestion flows where atomicity must be preserved.
func NewNamespacedDB ¶ added in v0.1.2
func NewNamespacedDB( ctx context.Context, logger *zap.Logger, dsn string, namespace string, waitForDB time.Duration, statementTimeout time.Duration, prom *prometheus.Registry, ) (*sql.DB, error)
NewNamespacedDB creates a new database with the given namespace if it doesn't exist and returns the full DSN for the new database.
func RunInTxRaw ¶ added in v0.5.0
func RunInTxWithResult ¶ added in v0.3.0
func SetVectorClockByOriginators ¶ added in v1.0.0
func SetVectorClockByOriginators( q *queries.SelectGatewayEnvelopesByOriginatorsParams, vc VectorClock, ) *queries.SelectGatewayEnvelopesByOriginatorsParams
func SetVectorClockByTopics ¶ added in v1.0.0
func SetVectorClockByTopics( q *queries.SelectGatewayEnvelopesByTopicsParams, vc VectorClock, ) *queries.SelectGatewayEnvelopesByTopicsParams
func SetVectorClockUnfiltered ¶ added in v1.0.0
func SetVectorClockUnfiltered( q *queries.SelectGatewayEnvelopesUnfilteredParams, vc VectorClock, ) *queries.SelectGatewayEnvelopesUnfilteredParams
func TransformRowsByOriginator ¶ added in v1.0.0
func TransformRowsByOriginator( rows []queries.SelectGatewayEnvelopesByOriginatorsRow, ) []queries.GatewayEnvelopesView
func TransformRowsByTopic ¶ added in v1.0.0
func TransformRowsByTopic( rows []queries.SelectGatewayEnvelopesByTopicsRow, ) []queries.GatewayEnvelopesView
Types ¶
type AdvisoryLocker ¶ added in v1.0.0
type AdvisoryLocker struct{}
AdvisoryLocker builds advisory-lock keys and acquires locks using the caller’s connection/transaction via the provided *queries.Queries
func NewAdvisoryLocker ¶ added in v1.0.0
func NewAdvisoryLocker() *AdvisoryLocker
func (*AdvisoryLocker) LockIdentityUpdateInsert ¶ added in v1.0.0
func (*AdvisoryLocker) TryLockAttestationWorker ¶ added in v1.1.0
func (*AdvisoryLocker) TryLockGeneratorWorker ¶ added in v1.1.0
func (*AdvisoryLocker) TryLockSettlementWorker ¶ added in v1.1.0
func (*AdvisoryLocker) TryLockSubmitterWorker ¶ added in v1.1.0
type DBSubscription ¶
type DBSubscription[ValueType any, CursorType any] struct { // contains filtered or unexported fields }
DBSubscription is a subscription that polls a DB for updates Assumes there is only one listener (updates block on a single unbuffered channel)
func NewDBSubscription ¶
func NewDBSubscription[ValueType any, CursorType any]( ctx context.Context, logger *zap.Logger, query PollableDBQuery[ValueType, CursorType], lastSeen CursorType, options PollingOptions, ) *DBSubscription[ValueType, CursorType]
func (*DBSubscription[ValueType, CursorType]) Start ¶
func (s *DBSubscription[ValueType, CursorType]) Start() (<-chan []ValueType, error)
type Handler ¶ added in v1.1.0
type Handler struct {
// contains filtered or unexported fields
}
Handler eases working with two databases - a read-write and read-only database. It mitigates the possibility of a component attempting a write to DB, not knowing it received a handle to a read-only SQL DB. Handler also makes the query intent explicit. The handler will correctly route the request to the appropriate DB. It also eases the transition if some part of the code used to do read-only access and later needs to write data.
func NewDBHandler ¶ added in v1.1.0
func NewDBHandler(db *sql.DB, options ...HandlerOption) *Handler
NewDBHandler creates a new database handler with two database connections - a read-write and a read one. If there's no exclusive read replica it can be omitted and the write replica will be used.
func (*Handler) WriteQuery ¶ added in v1.1.0
type HandlerOption ¶ added in v1.1.0
type HandlerOption func(*handlerConfig)
func WithReadReplica ¶ added in v1.1.0
func WithReadReplica(db *sql.DB) HandlerOption
type ITransactionScopedAdvisoryLocker ¶ added in v1.0.0
type LockKind ¶ added in v1.0.0
type LockKind uint8
LockKind marks the lowest 8 bits of the advisory lock key.
type PollableDBQuery ¶
type PollingOptions ¶
PollingOptions specifies the polling options for a DB subscription. It can poll whenever notified, or at an interval if not notified.
type TransactionScopedAdvisoryLocker ¶ added in v1.0.0
type TransactionScopedAdvisoryLocker struct {
// contains filtered or unexported fields
}
TransactionScopedAdvisoryLocker creates and owns a transaction; methods acquire advisory locks within that tx. Release() rolls back the tx
func NewTransactionScopedAdvisoryLocker ¶ added in v1.0.0
func (*TransactionScopedAdvisoryLocker) LockIdentityUpdateInsert ¶ added in v1.0.0
func (a *TransactionScopedAdvisoryLocker) LockIdentityUpdateInsert(nodeID uint32) error
func (*TransactionScopedAdvisoryLocker) Release ¶ added in v1.0.0
func (a *TransactionScopedAdvisoryLocker) Release() error
func (*TransactionScopedAdvisoryLocker) TryLockAttestationWorker ¶ added in v1.1.0
func (a *TransactionScopedAdvisoryLocker) TryLockAttestationWorker() (bool, error)
func (*TransactionScopedAdvisoryLocker) TryLockGeneratorWorker ¶ added in v1.1.0
func (a *TransactionScopedAdvisoryLocker) TryLockGeneratorWorker() (bool, error)
func (*TransactionScopedAdvisoryLocker) TryLockSettlementWorker ¶ added in v1.1.0
func (a *TransactionScopedAdvisoryLocker) TryLockSettlementWorker() (bool, error)
func (*TransactionScopedAdvisoryLocker) TryLockSubmitterWorker ¶ added in v1.1.0
func (a *TransactionScopedAdvisoryLocker) TryLockSubmitterWorker() (bool, error)
type VectorClock ¶
func ToVectorClock ¶
func ToVectorClock(rows []queries.GatewayEnvelopesLatest) VectorClock