store

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BalanceDao

type BalanceDao struct {
	bun.BaseModel   `bun:"table:indexer_balances"`
	PartyID         string `bun:",pk,type:varchar(255)"`
	InstrumentAdmin string `bun:",pk,type:varchar(255)"`
	InstrumentID    string `bun:",pk,type:varchar(255)"`
	Amount          string `bun:",notnull,type:text"`
}

BalanceDao maps to the 'indexer_balances' table. The composite key (PartyID, InstrumentAdmin, InstrumentID) is unique per holding.

type EventDao

type EventDao struct {
	bun.BaseModel   `bun:"table:indexer_events"`
	ContractID      string    `bun:",pk,type:varchar(255)"`
	InstrumentID    string    `bun:",notnull,type:varchar(255)"`
	InstrumentAdmin string    `bun:",notnull,type:varchar(255)"`
	Issuer          string    `bun:",notnull,type:varchar(255)"`
	EventType       string    `bun:",notnull,type:varchar(20)"`
	Amount          string    `bun:",notnull,type:text"`
	FromPartyID     *string   `bun:",type:varchar(255)"`
	ToPartyID       *string   `bun:",type:varchar(255)"`
	ExternalTxID    *string   `bun:",type:varchar(255)"`
	ExternalAddress *string   `bun:",type:varchar(255)"`
	Fingerprint     *string   `bun:",type:varchar(255)"`
	TxID            string    `bun:",notnull,type:varchar(255)"`
	LedgerOffset    int64     `bun:",notnull"`
	Timestamp       time.Time `bun:",notnull"`
	EffectiveTime   time.Time `bun:",notnull"`
}

EventDao maps to the 'indexer_events' table. ContractID is the idempotency key — one row per TokenTransferEvent contract.

type HoldingDao

type HoldingDao struct {
	bun.BaseModel   `bun:"table:indexer_holdings"`
	ContractID      string `bun:",pk,type:varchar(255)"`
	Owner           string `bun:",notnull,type:varchar(255)"`
	InstrumentAdmin string `bun:",notnull,type:varchar(255)"`
	InstrumentID    string `bun:",notnull,type:varchar(255)"`
	Amount          string `bun:",notnull,type:text"`
	LedgerOffset    int64  `bun:",notnull"`
}

HoldingDao maps to the 'indexer_holdings' table. One row per active Utility.Registry.Holding.V0.Holding contract. Inserted on CREATED events and deleted on ARCHIVED events; the stored amount is needed at archive time to decrement balances since archive events carry only contract_id.

type InstrumentedStore

type InstrumentedStore struct {
	// contains filtered or unexported fields
}

InstrumentedStore wraps a PGStore and records Prometheus metrics for every database operation. It satisfies both engine.Store (write path: processor) and service.Store (read path: HTTP API).

func NewInstrumentedStore

func NewInstrumentedStore(inner *PGStore, metrics *StoreMetrics) *InstrumentedStore

NewInstrumentedStore returns a metrics-instrumented wrapper around the given PGStore.

func (*InstrumentedStore) ApplyBalanceDelta

func (s *InstrumentedStore) ApplyBalanceDelta(ctx context.Context, partyID, instrumentAdmin, instrumentID, delta string) error

func (*InstrumentedStore) ApplySupplyDelta

func (s *InstrumentedStore) ApplySupplyDelta(ctx context.Context, instrumentAdmin, instrumentID, delta string) error

func (*InstrumentedStore) GetBalance

func (s *InstrumentedStore) GetBalance(ctx context.Context, partyID, admin, id string) (*indexer.Balance, error)

func (*InstrumentedStore) GetEvent

func (s *InstrumentedStore) GetEvent(ctx context.Context, contractID string) (*indexer.ParsedEvent, error)

func (*InstrumentedStore) GetToken

func (s *InstrumentedStore) GetToken(ctx context.Context, admin, id string) (*indexer.Token, error)

func (*InstrumentedStore) InsertEvent

func (s *InstrumentedStore) InsertEvent(ctx context.Context, event *indexer.ParsedEvent) (bool, error)

func (*InstrumentedStore) InsertHolding

func (s *InstrumentedStore) InsertHolding(ctx context.Context, h *indexer.HoldingChange) error

func (*InstrumentedStore) InsertPendingOffer

func (s *InstrumentedStore) InsertPendingOffer(ctx context.Context, offer *indexer.PendingOffer) error

func (*InstrumentedStore) LatestOffset

func (s *InstrumentedStore) LatestOffset(ctx context.Context) (int64, error)

func (*InstrumentedStore) ListAllPendingOffers

func (s *InstrumentedStore) ListAllPendingOffers(
	ctx context.Context, p indexer.Pagination,
) ([]indexer.PendingOffer, int64, error)

func (*InstrumentedStore) ListBalancesForParty

func (s *InstrumentedStore) ListBalancesForParty(
	ctx context.Context, partyID string, p indexer.Pagination,
) ([]*indexer.Balance, int64, error)

func (*InstrumentedStore) ListBalancesForToken

func (s *InstrumentedStore) ListBalancesForToken(
	ctx context.Context, admin, id string, p indexer.Pagination,
) ([]*indexer.Balance, int64, error)

func (*InstrumentedStore) ListEvents

func (*InstrumentedStore) ListPendingOffersForParty

func (s *InstrumentedStore) ListPendingOffersForParty(
	ctx context.Context, partyID string, p indexer.Pagination,
) ([]indexer.PendingOffer, int64, error)

func (*InstrumentedStore) ListTokens

func (*InstrumentedStore) MarkOfferAccepted

func (s *InstrumentedStore) MarkOfferAccepted(ctx context.Context, contractID string) error

func (*InstrumentedStore) RunInTx

func (s *InstrumentedStore) RunInTx(ctx context.Context, fn func(ctx context.Context, tx engine.Store) error) error

RunInTx wraps the inner RunInTx so that operations executed inside the transaction are also instrumented. The fn receives an instrumentedWriteStore that wraps the transaction-scoped engine.Store directly — no type assertion needed.

func (*InstrumentedStore) SaveOffset

func (s *InstrumentedStore) SaveOffset(ctx context.Context, offset int64) error

func (*InstrumentedStore) TakeHolding

func (s *InstrumentedStore) TakeHolding(ctx context.Context, contractID string) (indexer.HoldingChange, bool, error)

func (*InstrumentedStore) UpsertToken

func (s *InstrumentedStore) UpsertToken(ctx context.Context, token *indexer.Token) error

type OffsetDao

type OffsetDao struct {
	bun.BaseModel `bun:"table:indexer_offsets"`
	ID            int   `bun:",pk,default:1"`
	LedgerOffset  int64 `bun:",notnull,default:0"`
}

OffsetDao maps to the 'indexer_offsets' table. A single row (ID=1) holds the latest persisted ledger offset.

type PGStore

type PGStore struct {
	// contains filtered or unexported fields
}

PGStore is a PostgreSQL-backed store for the indexer, using Bun ORM. It satisfies both engine.Store (write path: processor) and service.Store (read path: HTTP).

func NewStore

func NewStore(db *bun.DB) *PGStore

NewStore creates a new Bun-backed indexer store.

func (*PGStore) ApplyBalanceDelta

func (s *PGStore) ApplyBalanceDelta(ctx context.Context, partyID, instrumentAdmin, instrumentID, delta string) error

ApplyBalanceDelta adjusts a party's balance by delta (signed decimal string). Manages HolderCount on the parent token:

  • increments when balance transitions from zero to positive
  • decrements when balance transitions from positive to zero

Must be called within a RunInTx transaction so the three steps are atomic.

func (*PGStore) ApplySupplyDelta

func (s *PGStore) ApplySupplyDelta(ctx context.Context, instrumentAdmin, instrumentID, delta string) error

ApplySupplyDelta adds delta (signed decimal string) to a token's TotalSupply.

func (*PGStore) GetBalance

func (s *PGStore) GetBalance(ctx context.Context, partyID, admin, id string) (*indexer.Balance, error)

GetBalance retrieves a single balance record. Returns nil, nil when not found.

func (*PGStore) GetEvent

func (s *PGStore) GetEvent(ctx context.Context, contractID string) (*indexer.ParsedEvent, error)

GetEvent retrieves a single event by contract ID. Returns nil, nil when not found.

func (*PGStore) GetToken

func (s *PGStore) GetToken(ctx context.Context, admin, id string) (*indexer.Token, error)

GetToken retrieves token metadata by composite key. Returns nil, nil when not found.

func (*PGStore) InsertEvent

func (s *PGStore) InsertEvent(ctx context.Context, event *indexer.ParsedEvent) (bool, error)

InsertEvent persists one ParsedEvent. Returns inserted=false when the event already exists (idempotent by ContractID).

func (*PGStore) InsertHolding

func (s *PGStore) InsertHolding(ctx context.Context, h *indexer.HoldingChange) error

InsertHolding records an active Utility.Registry.Holding contract. Idempotent by ContractID — replayed CREATED events return no error.

func (*PGStore) InsertPendingOffer

func (s *PGStore) InsertPendingOffer(ctx context.Context, offer *indexer.PendingOffer) error

InsertPendingOffer records a new TransferOffer with status PENDING. Idempotent by ContractID.

func (*PGStore) LatestOffset

func (s *PGStore) LatestOffset(ctx context.Context) (int64, error)

LatestOffset returns the last persisted ledger offset, or 0 on a fresh start.

func (*PGStore) ListAllPendingOffers

func (s *PGStore) ListAllPendingOffers(
	ctx context.Context, p indexer.Pagination,
) ([]indexer.PendingOffer, int64, error)

ListAllPendingOffers returns all PENDING offers across all parties, ordered by ledger_offset ASC, with pagination.

func (*PGStore) ListBalancesForParty

func (s *PGStore) ListBalancesForParty(ctx context.Context, partyID string, p indexer.Pagination) ([]*indexer.Balance, int64, error)

ListBalancesForParty returns a paginated list of all holdings for a given party. The Count and Scan are executed within a single read-only transaction so the total and the page are derived from the same consistent snapshot (see runReadTx).

func (*PGStore) ListBalancesForToken

func (s *PGStore) ListBalancesForToken(ctx context.Context, admin, id string, p indexer.Pagination) ([]*indexer.Balance, int64, error)

ListBalancesForToken returns a paginated list of all holders of a given token. The Count and Scan are executed within a single read-only transaction so the total and the page are derived from the same consistent snapshot (see runReadTx).

func (*PGStore) ListEvents

ListEvents returns a paginated, ledger_offset-ascending list of events. Zero-value EventFilter fields are ignored. The Count and Scan are executed within a single read-only transaction so the total and the page are derived from the same consistent snapshot (see runReadTx).

func (*PGStore) ListPendingOffersForParty

func (s *PGStore) ListPendingOffersForParty(
	ctx context.Context, partyID string, p indexer.Pagination,
) ([]indexer.PendingOffer, int64, error)

ListPendingOffersForParty returns PENDING offers where receiver = partyID, ordered by ledger_offset ASC, with pagination.

func (*PGStore) ListTokens

func (s *PGStore) ListTokens(ctx context.Context, p indexer.Pagination) ([]*indexer.Token, int64, error)

ListTokens returns a paginated list of all indexed tokens, ordered by first_seen_offset ASC. The Count and Scan are executed within a single read-only transaction so the total and the page are derived from the same consistent snapshot (see runReadTx).

func (*PGStore) MarkOfferAccepted

func (s *PGStore) MarkOfferAccepted(ctx context.Context, contractID string) error

MarkOfferAccepted sets a TransferOffer's status to ACCEPTED. No-op when not found.

func (*PGStore) RunInTx

func (s *PGStore) RunInTx(ctx context.Context, fn func(ctx context.Context, tx engine.Store) error) error

RunInTx executes fn inside a single database transaction. The Store passed to fn is scoped to that transaction.

func (*PGStore) SaveOffset

func (s *PGStore) SaveOffset(ctx context.Context, offset int64) error

SaveOffset upserts the single-row offset record.

func (*PGStore) TakeHolding

func (s *PGStore) TakeHolding(ctx context.Context, contractID string) (h indexer.HoldingChange, ok bool, err error)

TakeHolding deletes the holding row matching contractID and returns its owner/instrument/amount so the caller can apply the symmetric balance delta. Returns ok=false when the row does not exist (replayed ARCHIVED event).

func (*PGStore) UpsertToken

func (s *PGStore) UpsertToken(ctx context.Context, token *indexer.Token) error

UpsertToken records a token on first observation. Subsequent calls for the same composite key (InstrumentAdmin, InstrumentID) are no-ops.

type PendingOfferDao

type PendingOfferDao struct {
	bun.BaseModel   `bun:"table:indexer_pending_offers"`
	ContractID      string    `bun:",pk,type:varchar(255)"`
	Status          string    `bun:",notnull,type:varchar(20),default:'PENDING'"`
	ReceiverPartyID string    `bun:",notnull,type:varchar(255)"`
	SenderPartyID   string    `bun:",notnull,type:varchar(255)"`
	InstrumentAdmin string    `bun:",notnull,type:varchar(255)"`
	InstrumentID    string    `bun:",notnull,type:varchar(255)"`
	Amount          string    `bun:",notnull,type:text"`
	LedgerOffset    int64     `bun:",notnull"`
	CreatedAt       time.Time `bun:",notnull"`
}

PendingOfferDao maps to the 'indexer_pending_offers' table. Rows are written on TransferOffer CREATED events and updated (status→ACCEPTED) on ARCHIVED events. Rows are never deleted — the table is a full audit log.

type StoreMetrics

type StoreMetrics struct {
	// QueryDuration tracks database query latency partitioned by operation.
	QueryDuration *prometheus.HistogramVec

	// Errors counts database errors partitioned by operation.
	Errors *prometheus.CounterVec
}

StoreMetrics holds Prometheus collectors for the indexer database layer.

func NewNopStoreMetrics

func NewNopStoreMetrics() *StoreMetrics

NewNopStoreMetrics returns a StoreMetrics instance backed by a throwaway registry. Use in tests where metric values are not asserted.

func NewStoreMetrics

func NewStoreMetrics(reg sharedmetrics.NamespacedRegisterer) *StoreMetrics

NewStoreMetrics registers indexer store metrics against the given registerer.

func (*StoreMetrics) IncErrors

func (m *StoreMetrics) IncErrors(op StoreOperation)

IncErrors increments the error counter for the given operation.

func (*StoreMetrics) ObserveQueryDuration

func (m *StoreMetrics) ObserveQueryDuration(op StoreOperation) prometheus.Observer

ObserveQueryDuration returns the observer for the given operation's query duration.

type StoreOperation

type StoreOperation string

StoreOperation identifies a database operation for metrics labeling.

const (
	// Write-path operations (processor / engine.Store).
	OpLatestOffset       StoreOperation = "latest_offset"
	OpInsertEvent        StoreOperation = "insert_event"
	OpSaveOffset         StoreOperation = "save_offset"
	OpUpsertToken        StoreOperation = "upsert_token"
	OpApplyBalanceDelta  StoreOperation = "apply_balance_delta"
	OpApplySupplyDelta   StoreOperation = "apply_supply_delta"
	OpInsertPendingOffer StoreOperation = "insert_pending_offer"
	OpMarkOfferAccepted  StoreOperation = "mark_offer_accepted"
	OpInsertHolding      StoreOperation = "insert_holding"
	OpTakeHolding        StoreOperation = "take_holding"

	// Read-path operations (HTTP API / service.Store).
	OpGetToken                StoreOperation = "get_token"
	OpListTokens              StoreOperation = "list_tokens"
	OpGetBalance              StoreOperation = "get_balance"
	OpListBalancesForParty    StoreOperation = "list_balances_for_party"
	OpListBalancesForToken    StoreOperation = "list_balances_for_token"
	OpGetEvent                StoreOperation = "get_event"
	OpListEvents              StoreOperation = "list_events"
	OpListPendingOffersForPty StoreOperation = "list_pending_offers_for_party"
	OpListAllPendingOffers    StoreOperation = "list_all_pending_offers"
)

type TokenDao

type TokenDao struct {
	bun.BaseModel   `bun:"table:indexer_tokens"`
	InstrumentAdmin string    `bun:",pk,type:varchar(255)"`
	InstrumentID    string    `bun:",pk,type:varchar(255)"`
	Issuer          string    `bun:",notnull,type:varchar(255)"`
	TotalSupply     string    `bun:",notnull,type:text,default:'0'"`
	HolderCount     int64     `bun:",notnull,default:0"`
	FirstSeenOffset int64     `bun:",notnull"`
	FirstSeenAt     time.Time `bun:",notnull"`
}

TokenDao maps to the 'indexer_tokens' table. The composite key (InstrumentAdmin, InstrumentID) uniquely identifies a token.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL