Documentation
¶
Overview ¶
Package db provides DB access and advisory-lock helpers for coordinating HA workers. It includes two styles:
- AdvisoryLocker: inherits the caller's transaction/connection.
- TransactionScopedAdvisoryLocker: creates and owns its own transaction.
Package db implements the database connection and management.
Index ¶
- Constants
- func CalculateRowsPerEntry(numEntries int, rowLimit int32) int32
- func ConnectToDB(ctx context.Context, logger *zap.Logger, dsn string, namespace string, ...) (*sql.DB, error)
- func FillMissingOriginators(vc VectorClock, allOriginators []uint32)
- func Get5MinutesOfCongestion(ctx context.Context, querier *queries.Queries, originatorID, endMinute int32) (out [5]int32, err error)
- func InsertGatewayEnvelopeAndIncrementUnsettledUsage(ctx context.Context, db *sql.DB, ...) (int64, error)
- func InsertGatewayEnvelopeBatchAndIncrementUnsettledUsage(ctx context.Context, db *sql.DB, input *types.GatewayEnvelopeBatch) (int64, error)
- func InsertGatewayEnvelopeBatchTransactional(ctx context.Context, q *queries.Queries, input *types.GatewayEnvelopeBatch) (int64, error)
- func InsertGatewayEnvelopeWithChecksStandalone(ctx context.Context, q *queries.Queries, ...) (queries.InsertGatewayEnvelopeRow, error)
- func InsertGatewayEnvelopeWithChecksTransactional(ctx context.Context, q *queries.Queries, ...) (queries.InsertGatewayEnvelopeRow, error)
- func NewNamespacedDB(ctx context.Context, logger *zap.Logger, dsn string, namespace string, ...) (*sql.DB, error)
- func NullInt32(v int32) sql.NullInt32
- func NullInt64(v int64) sql.NullInt64
- func RunInTx(ctx context.Context, db *sql.DB, opts *sql.TxOptions, ...) error
- func RunInTxRaw(ctx context.Context, db *sql.DB, opts *sql.TxOptions, ...) error
- func RunInTxWithResult[T any](ctx context.Context, db *sql.DB, opts *sql.TxOptions, ...) (result T, err error)
- func SetPerTopicCursors(q *queries.SelectGatewayEnvelopesByPerTopicCursorsParams, tc TopicCursors)
- func SetVectorClockByOriginators(q *queries.SelectGatewayEnvelopesByOriginatorsParams, ...) *queries.SelectGatewayEnvelopesByOriginatorsParams
- func SetVectorClockByTopics(q *queries.SelectGatewayEnvelopesByTopicsParams, vc VectorClock) *queries.SelectGatewayEnvelopesByTopicsParams
- func SetVectorClockUnfiltered(q *queries.SelectGatewayEnvelopesUnfilteredParams, vc VectorClock) *queries.SelectGatewayEnvelopesUnfilteredParams
- func TransformRowsByOriginator[T GatewayEnvelopesByOriginatorRow](rows []T) []queries.GatewayEnvelopesView
- func TransformRowsByPerTopicCursors(rows []queries.SelectGatewayEnvelopesByPerTopicCursorsRow) []queries.GatewayEnvelopesView
- func TransformRowsByTopic(rows []queries.SelectGatewayEnvelopesByTopicsRow) []queries.GatewayEnvelopesView
- type AdvisoryLocker
- func (a *AdvisoryLocker) LockIdentityUpdateInsert(ctx context.Context, queries *queries.Queries, nodeID uint32) error
- func (a *AdvisoryLocker) TryLockAttestationWorker(ctx context.Context, queries *queries.Queries) (bool, error)
- func (a *AdvisoryLocker) TryLockGeneratorWorker(ctx context.Context, queries *queries.Queries) (bool, error)
- func (a *AdvisoryLocker) TryLockSettlementWorker(ctx context.Context, queries *queries.Queries) (bool, error)
- func (a *AdvisoryLocker) TryLockSubmitterWorker(ctx context.Context, queries *queries.Queries) (bool, error)
- type CachedOriginatorList
- type DBSubscription
- type GatewayEnvelopesByOriginatorRow
- type Handler
- type HandlerOption
- type ITransactionScopedAdvisoryLocker
- type LockKind
- type OriginatorLister
- type PollableDBQuery
- type PollingOptions
- type Topic
- type TopicCursors
- type TransactionScopedAdvisoryLocker
- func (a *TransactionScopedAdvisoryLocker) LockIdentityUpdateInsert(nodeID uint32) error
- func (a *TransactionScopedAdvisoryLocker) Release() error
- func (a *TransactionScopedAdvisoryLocker) TryLockAttestationWorker() (bool, error)
- func (a *TransactionScopedAdvisoryLocker) TryLockGeneratorWorker() (bool, error)
- func (a *TransactionScopedAdvisoryLocker) TryLockSettlementWorker() (bool, error)
- func (a *TransactionScopedAdvisoryLocker) TryLockSubmitterWorker() (bool, error)
- type VectorClock
Constants ¶
const GatewayEnvelopeBandWidth int64 = 1_000_000
Variables ¶
This section is empty.
Functions ¶
func CalculateRowsPerEntry ¶ added in v1.2.0
CalculateRowsPerEntry computes the per-(topic, originator) sub-limit for the per-topic cursor query. Returns at least 10 to avoid starving low-volume originators.
func ConnectToDB ¶ added in v0.4.0
func ConnectToDB(ctx context.Context, logger *zap.Logger, dsn string, namespace string, waitForDB time.Duration, statementTimeout time.Duration, prom *prometheus.Registry, ) (*sql.DB, error)
ConnectToDB establishes a connection to an existing database using the provided DSN. Unlike NewNamespacedDB, this function does not create the database or run migrations. If namespace is provided, it overrides the database name in the DSN.
func FillMissingOriginators ¶ added in v1.2.0
func FillMissingOriginators(vc VectorClock, allOriginators []uint32)
FillMissingOriginators ensures that every originator from allOriginators is present in the vector clock. Missing originators are added with a sequence ID of 0, meaning "start from the beginning".
func Get5MinutesOfCongestion ¶ added in v0.3.0
func Get5MinutesOfCongestion( ctx context.Context, querier *queries.Queries, originatorID, endMinute int32, ) (out [5]int32, err error)
Get5MinutesOfCongestion gets the congestion for the minute specified by `endMinute` and the previous 4 minutes returned in descending order with missing values filled with 0
func InsertGatewayEnvelopeAndIncrementUnsettledUsage ¶ added in v0.3.0
func InsertGatewayEnvelopeAndIncrementUnsettledUsage( ctx context.Context, db *sql.DB, insertParams queries.InsertGatewayEnvelopeParams, incrementParams queries.IncrementUnsettledUsageParams, incrementCongestion bool, ) (int64, error)
InsertGatewayEnvelopeAndIncrementUnsettledUsage inserts a gateway envelope and updates unsettled usage and congestion counters within a single database transaction.
This function runs inside a managed transaction created by RunInTxWithResult().
Steps:
- Calls InsertGatewayEnvelopeWithChecksTransactional() to insert the envelope, automatically creating any missing partitions if needed.
- If a new envelope is inserted, increments unsettled usage and congestion counters for the originator within the same transaction.
- If the envelope already exists (duplicate insert), metrics are not updated.
The function ensures atomicity between envelope insertion and usage updates. Safe for high-throughput ingest paths where message persistence and usage tracking must succeed or fail together.
func InsertGatewayEnvelopeBatchAndIncrementUnsettledUsage ¶ added in v1.2.0
func InsertGatewayEnvelopeBatchAndIncrementUnsettledUsage( ctx context.Context, db *sql.DB, input *types.GatewayEnvelopeBatch, ) (int64, error)
InsertGatewayEnvelopeBatchAndIncrementUnsettledUsage inserts a batch of gateway envelopes and updates unsettled usage within a single database transaction.
This is a convenience wrapper that creates its own transaction. Use InsertGatewayEnvelopeBatchTransactional when you need to participate in an existing transaction.
func InsertGatewayEnvelopeBatchTransactional ¶ added in v1.2.0
func InsertGatewayEnvelopeBatchTransactional( ctx context.Context, q *queries.Queries, input *types.GatewayEnvelopeBatch, ) (int64, error)
InsertGatewayEnvelopeBatchTransactional inserts a batch of gateway envelopes within an existing transaction.
The input is an array of originator node IDs, sequence IDs, topics, payer IDs, gateway times, expiries, originator envelopes, and spend picodollars.
The sequenceIDs are expected to be strictly ascending per originator node ID.
Payer IDs considerations:
- if not 0, they must exist.
- if 0, they are treated as null, as it's nullable in gateway_envelopes_meta.
- if 0, no unsettled usage is incremented.
func InsertGatewayEnvelopeWithChecksStandalone ¶ added in v1.0.0
func InsertGatewayEnvelopeWithChecksStandalone( ctx context.Context, q *queries.Queries, row queries.InsertGatewayEnvelopeParams, ) (queries.InsertGatewayEnvelopeRow, error)
InsertGatewayEnvelopeWithChecksStandalone inserts a gateway envelope in a non-transactional context, automatically creating missing partitions and retrying once.
Behavior:
- Performs an insert into the v2 tables.
- On “no partition of relation …” errors, creates the necessary partitions in the same connection, and retries the insert once.
This function does not use SAVEPOINTs and does not depend on an explicit transaction. It is ideal for standalone operations such as backfills, batch imports, or ingestion workers where each insert is independent of others.
func InsertGatewayEnvelopeWithChecksTransactional ¶ added in v1.0.0
func InsertGatewayEnvelopeWithChecksTransactional( ctx context.Context, q *queries.Queries, row queries.InsertGatewayEnvelopeParams, ) (queries.InsertGatewayEnvelopeRow, error)
InsertGatewayEnvelopeWithChecksTransactional attempts to insert a gateway envelope inside the current SQL transaction, with automatic partition creation and retry.
Behavior:
- Creates a SAVEPOINT before the insert so that a failure does not abort the entire tx.
- On “no partition of relation …” errors, it rolls back to the savepoint, creates the missing partition(s) using the same connection (within the tx), and retries the insert once.
- On success, the savepoint is released.
This variant must be called within an active transaction. It avoids full tx rollbacks and ensures inserts can proceed even when new partitions are required. Use for transactional ingestion flows where atomicity must be preserved.
func NewNamespacedDB ¶ added in v0.1.2
func NewNamespacedDB( ctx context.Context, logger *zap.Logger, dsn string, namespace string, waitForDB time.Duration, statementTimeout time.Duration, prom *prometheus.Registry, ) (*sql.DB, error)
NewNamespacedDB creates a new database with the given namespace if it doesn't exist and returns the full DSN for the new database.
func RunInTxRaw ¶ added in v0.5.0
func RunInTxWithResult ¶ added in v0.3.0
func SetPerTopicCursors ¶ added in v1.2.0
func SetPerTopicCursors( q *queries.SelectGatewayEnvelopesByPerTopicCursorsParams, tc TopicCursors, )
SetPerTopicCursors flattens TopicCursors into the parallel arrays required by SelectGatewayEnvelopesByPerTopicCursors. Each (topic, nodeID, seqID) triple produces one entry in the three arrays.
func SetVectorClockByOriginators ¶ added in v1.0.0
func SetVectorClockByOriginators( q *queries.SelectGatewayEnvelopesByOriginatorsParams, originatorNodeIds []int32, vc VectorClock, ) *queries.SelectGatewayEnvelopesByOriginatorsParams
SetVectorClockByOriginators populates the cursor arrays for SelectGatewayEnvelopesByOriginators. All originatorNodeIds are included; positions come from vc when present, otherwise default to 0 (start from beginning).
func SetVectorClockByTopics ¶ added in v1.0.0
func SetVectorClockByTopics( q *queries.SelectGatewayEnvelopesByTopicsParams, vc VectorClock, ) *queries.SelectGatewayEnvelopesByTopicsParams
func SetVectorClockUnfiltered ¶ added in v1.0.0
func SetVectorClockUnfiltered( q *queries.SelectGatewayEnvelopesUnfilteredParams, vc VectorClock, ) *queries.SelectGatewayEnvelopesUnfilteredParams
func TransformRowsByOriginator ¶ added in v1.0.0
func TransformRowsByOriginator[T GatewayEnvelopesByOriginatorRow]( rows []T, ) []queries.GatewayEnvelopesView
func TransformRowsByPerTopicCursors ¶ added in v1.2.0
func TransformRowsByPerTopicCursors( rows []queries.SelectGatewayEnvelopesByPerTopicCursorsRow, ) []queries.GatewayEnvelopesView
TransformRowsByPerTopicCursors converts per-topic cursor rows to the common GatewayEnvelopesView type.
func TransformRowsByTopic ¶ added in v1.0.0
func TransformRowsByTopic( rows []queries.SelectGatewayEnvelopesByTopicsRow, ) []queries.GatewayEnvelopesView
Types ¶
type AdvisoryLocker ¶ added in v1.0.0
type AdvisoryLocker struct{}
AdvisoryLocker builds advisory-lock keys and acquires locks using the caller’s connection/transaction via the provided *queries.Queries
func NewAdvisoryLocker ¶ added in v1.0.0
func NewAdvisoryLocker() *AdvisoryLocker
func (*AdvisoryLocker) LockIdentityUpdateInsert ¶ added in v1.0.0
func (*AdvisoryLocker) TryLockAttestationWorker ¶ added in v1.1.0
func (*AdvisoryLocker) TryLockGeneratorWorker ¶ added in v1.1.0
func (*AdvisoryLocker) TryLockSettlementWorker ¶ added in v1.1.0
func (*AdvisoryLocker) TryLockSubmitterWorker ¶ added in v1.1.0
type CachedOriginatorList ¶ added in v1.2.0
type CachedOriginatorList struct {
// contains filtered or unexported fields
}
CachedOriginatorList queries gateway_envelopes_latest for originator IDs and caches the result for a configurable TTL.
func NewCachedOriginatorList ¶ added in v1.2.0
func (*CachedOriginatorList) GetOriginatorNodeIDs ¶ added in v1.2.0
func (c *CachedOriginatorList) GetOriginatorNodeIDs( ctx context.Context, ) ([]uint32, error)
type DBSubscription ¶
type DBSubscription[ValueType any, CursorType any] struct { // contains filtered or unexported fields }
DBSubscription is a subscription that polls a DB for updates Assumes there is only one listener (updates block on a single unbuffered channel)
func NewDBSubscription ¶
func NewDBSubscription[ValueType any, CursorType any]( ctx context.Context, logger *zap.Logger, query PollableDBQuery[ValueType, CursorType], lastSeen CursorType, options PollingOptions, ) *DBSubscription[ValueType, CursorType]
func (*DBSubscription[ValueType, CursorType]) Start ¶
func (s *DBSubscription[ValueType, CursorType]) Start() (<-chan []ValueType, error)
type GatewayEnvelopesByOriginatorRow ¶ added in v1.2.0
type GatewayEnvelopesByOriginatorRow interface {
queries.SelectGatewayEnvelopesBySingleOriginatorRow | queries.SelectGatewayEnvelopesByOriginatorsRow
}
type Handler ¶ added in v1.1.0
type Handler struct {
// contains filtered or unexported fields
}
Handler eases working with two databases - a read-write and read-only database. It mitigates the possibility of a component attempting a write to DB, not knowing it received a handle to a read-only SQL DB. Handler also makes the query intent explicit. The handler will correctly route the request to the appropriate DB. It also eases the transition if some part of the code used to do read-only access and later needs to write data.
func NewDBHandler ¶ added in v1.1.0
func NewDBHandler(db *sql.DB, options ...HandlerOption) *Handler
NewDBHandler creates a new database handler with two database connections - a read-write and a read one. If there's no exclusive read replica it can be omitted and the write replica will be used.
func (*Handler) WriteQuery ¶ added in v1.1.0
type HandlerOption ¶ added in v1.1.0
type HandlerOption func(*handlerConfig)
func WithReadReplica ¶ added in v1.1.0
func WithReadReplica(db *sql.DB) HandlerOption
type ITransactionScopedAdvisoryLocker ¶ added in v1.0.0
type LockKind ¶ added in v1.0.0
type LockKind uint8
LockKind marks the lowest 8 bits of the advisory lock key.
type OriginatorLister ¶ added in v1.2.0
OriginatorLister returns a list of all known originator node IDs.
type PollableDBQuery ¶
type PollingOptions ¶
PollingOptions specifies the polling options for a DB subscription. It can poll whenever notified, or at an interval if not notified.
type TopicCursors ¶ added in v1.2.0
type TopicCursors map[string]VectorClock
TopicCursors maps raw topic bytes (as string key) to a per-topic VectorClock.
type TransactionScopedAdvisoryLocker ¶ added in v1.0.0
type TransactionScopedAdvisoryLocker struct {
// contains filtered or unexported fields
}
TransactionScopedAdvisoryLocker creates and owns a transaction; methods acquire advisory locks within that tx. Release() rolls back the tx
func NewTransactionScopedAdvisoryLocker ¶ added in v1.0.0
func (*TransactionScopedAdvisoryLocker) LockIdentityUpdateInsert ¶ added in v1.0.0
func (a *TransactionScopedAdvisoryLocker) LockIdentityUpdateInsert(nodeID uint32) error
func (*TransactionScopedAdvisoryLocker) Release ¶ added in v1.0.0
func (a *TransactionScopedAdvisoryLocker) Release() error
func (*TransactionScopedAdvisoryLocker) TryLockAttestationWorker ¶ added in v1.1.0
func (a *TransactionScopedAdvisoryLocker) TryLockAttestationWorker() (bool, error)
func (*TransactionScopedAdvisoryLocker) TryLockGeneratorWorker ¶ added in v1.1.0
func (a *TransactionScopedAdvisoryLocker) TryLockGeneratorWorker() (bool, error)
func (*TransactionScopedAdvisoryLocker) TryLockSettlementWorker ¶ added in v1.1.0
func (a *TransactionScopedAdvisoryLocker) TryLockSettlementWorker() (bool, error)
func (*TransactionScopedAdvisoryLocker) TryLockSubmitterWorker ¶ added in v1.1.0
func (a *TransactionScopedAdvisoryLocker) TryLockSubmitterWorker() (bool, error)
type VectorClock ¶
func ToVectorClock ¶
func ToVectorClock(rows []queries.GatewayEnvelopesLatest) VectorClock
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package seeds provides functions to seed the database with test data.
|
Package seeds provides functions to seed the database with test data. |
|
Package types defines custom types for the db package.
|
Package types defines custom types for the db package. |