Documentation
¶
Index ¶
- type BalanceDao
- type EventDao
- type HoldingDao
- type InstrumentedStore
- func (s *InstrumentedStore) ApplyBalanceDelta(ctx context.Context, partyID, instrumentAdmin, instrumentID, delta string) error
- func (s *InstrumentedStore) ApplySupplyDelta(ctx context.Context, instrumentAdmin, instrumentID, delta string) error
- func (s *InstrumentedStore) GetBalance(ctx context.Context, partyID, admin, id string) (*indexer.Balance, error)
- func (s *InstrumentedStore) GetEvent(ctx context.Context, contractID string) (*indexer.ParsedEvent, error)
- func (s *InstrumentedStore) GetToken(ctx context.Context, admin, id string) (*indexer.Token, error)
- func (s *InstrumentedStore) InsertEvent(ctx context.Context, event *indexer.ParsedEvent) (bool, error)
- func (s *InstrumentedStore) InsertHolding(ctx context.Context, h *indexer.HoldingChange) error
- func (s *InstrumentedStore) InsertPendingOffer(ctx context.Context, offer *indexer.PendingOffer) error
- func (s *InstrumentedStore) LatestOffset(ctx context.Context) (int64, error)
- func (s *InstrumentedStore) ListAllPendingOffers(ctx context.Context, p indexer.Pagination) ([]indexer.PendingOffer, int64, error)
- func (s *InstrumentedStore) ListBalancesForParty(ctx context.Context, partyID string, p indexer.Pagination) ([]*indexer.Balance, int64, error)
- func (s *InstrumentedStore) ListBalancesForToken(ctx context.Context, admin, id string, p indexer.Pagination) ([]*indexer.Balance, int64, error)
- func (s *InstrumentedStore) ListEvents(ctx context.Context, f indexer.EventFilter, p indexer.Pagination) ([]*indexer.ParsedEvent, int64, error)
- func (s *InstrumentedStore) ListPendingOffersForParty(ctx context.Context, partyID string, p indexer.Pagination) ([]indexer.PendingOffer, int64, error)
- func (s *InstrumentedStore) ListTokens(ctx context.Context, p indexer.Pagination) ([]*indexer.Token, int64, error)
- func (s *InstrumentedStore) MarkOfferAccepted(ctx context.Context, contractID string) error
- func (s *InstrumentedStore) RunInTx(ctx context.Context, fn func(ctx context.Context, tx engine.Store) error) error
- func (s *InstrumentedStore) SaveOffset(ctx context.Context, offset int64) error
- func (s *InstrumentedStore) TakeHolding(ctx context.Context, contractID string) (indexer.HoldingChange, bool, error)
- func (s *InstrumentedStore) UpsertToken(ctx context.Context, token *indexer.Token) error
- type OffsetDao
- type PGStore
- func (s *PGStore) ApplyBalanceDelta(ctx context.Context, partyID, instrumentAdmin, instrumentID, delta string) error
- func (s *PGStore) ApplySupplyDelta(ctx context.Context, instrumentAdmin, instrumentID, delta string) error
- func (s *PGStore) GetBalance(ctx context.Context, partyID, admin, id string) (*indexer.Balance, error)
- func (s *PGStore) GetEvent(ctx context.Context, contractID string) (*indexer.ParsedEvent, error)
- func (s *PGStore) GetToken(ctx context.Context, admin, id string) (*indexer.Token, error)
- func (s *PGStore) InsertEvent(ctx context.Context, event *indexer.ParsedEvent) (bool, error)
- func (s *PGStore) InsertHolding(ctx context.Context, h *indexer.HoldingChange) error
- func (s *PGStore) InsertPendingOffer(ctx context.Context, offer *indexer.PendingOffer) error
- func (s *PGStore) LatestOffset(ctx context.Context) (int64, error)
- func (s *PGStore) ListAllPendingOffers(ctx context.Context, p indexer.Pagination) ([]indexer.PendingOffer, int64, error)
- func (s *PGStore) ListBalancesForParty(ctx context.Context, partyID string, p indexer.Pagination) ([]*indexer.Balance, int64, error)
- func (s *PGStore) ListBalancesForToken(ctx context.Context, admin, id string, p indexer.Pagination) ([]*indexer.Balance, int64, error)
- func (s *PGStore) ListEvents(ctx context.Context, f indexer.EventFilter, p indexer.Pagination) ([]*indexer.ParsedEvent, int64, error)
- func (s *PGStore) ListPendingOffersForParty(ctx context.Context, partyID string, p indexer.Pagination) ([]indexer.PendingOffer, int64, error)
- func (s *PGStore) ListTokens(ctx context.Context, p indexer.Pagination) ([]*indexer.Token, int64, error)
- func (s *PGStore) MarkOfferAccepted(ctx context.Context, contractID string) error
- func (s *PGStore) RunInTx(ctx context.Context, fn func(ctx context.Context, tx engine.Store) error) error
- func (s *PGStore) SaveOffset(ctx context.Context, offset int64) error
- func (s *PGStore) TakeHolding(ctx context.Context, contractID string) (h indexer.HoldingChange, ok bool, err error)
- func (s *PGStore) UpsertToken(ctx context.Context, token *indexer.Token) error
- type PendingOfferDao
- type StoreMetrics
- type StoreOperation
- type TokenDao
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BalanceDao ¶
type BalanceDao struct {
bun.BaseModel `bun:"table:indexer_balances"`
PartyID string `bun:",pk,type:varchar(255)"`
InstrumentAdmin string `bun:",pk,type:varchar(255)"`
InstrumentID string `bun:",pk,type:varchar(255)"`
Amount string `bun:",notnull,type:text"`
}
BalanceDao maps to the 'indexer_balances' table. The composite key (PartyID, InstrumentAdmin, InstrumentID) is unique per holding.
type EventDao ¶
type EventDao struct {
bun.BaseModel `bun:"table:indexer_events"`
ContractID string `bun:",pk,type:varchar(255)"`
InstrumentID string `bun:",notnull,type:varchar(255)"`
InstrumentAdmin string `bun:",notnull,type:varchar(255)"`
Issuer string `bun:",notnull,type:varchar(255)"`
EventType string `bun:",notnull,type:varchar(20)"`
Amount string `bun:",notnull,type:text"`
FromPartyID *string `bun:",type:varchar(255)"`
ToPartyID *string `bun:",type:varchar(255)"`
ExternalTxID *string `bun:",type:varchar(255)"`
ExternalAddress *string `bun:",type:varchar(255)"`
Fingerprint *string `bun:",type:varchar(255)"`
TxID string `bun:",notnull,type:varchar(255)"`
LedgerOffset int64 `bun:",notnull"`
Timestamp time.Time `bun:",notnull"`
EffectiveTime time.Time `bun:",notnull"`
}
EventDao maps to the 'indexer_events' table. ContractID is the idempotency key — one row per TokenTransferEvent contract.
type HoldingDao ¶
type HoldingDao struct {
bun.BaseModel `bun:"table:indexer_holdings"`
ContractID string `bun:",pk,type:varchar(255)"`
Owner string `bun:",notnull,type:varchar(255)"`
InstrumentAdmin string `bun:",notnull,type:varchar(255)"`
InstrumentID string `bun:",notnull,type:varchar(255)"`
Amount string `bun:",notnull,type:text"`
LedgerOffset int64 `bun:",notnull"`
}
HoldingDao maps to the 'indexer_holdings' table. One row per active Utility.Registry.Holding.V0.Holding contract. Inserted on CREATED events and deleted on ARCHIVED events; the stored amount is needed at archive time to decrement balances since archive events carry only contract_id.
type InstrumentedStore ¶
type InstrumentedStore struct {
// contains filtered or unexported fields
}
InstrumentedStore wraps a PGStore and records Prometheus metrics for every database operation. It satisfies both engine.Store (write path: processor) and service.Store (read path: HTTP API).
func NewInstrumentedStore ¶
func NewInstrumentedStore(inner *PGStore, metrics *StoreMetrics) *InstrumentedStore
NewInstrumentedStore returns a metrics-instrumented wrapper around the given PGStore.
func (*InstrumentedStore) ApplyBalanceDelta ¶
func (s *InstrumentedStore) ApplyBalanceDelta(ctx context.Context, partyID, instrumentAdmin, instrumentID, delta string) error
func (*InstrumentedStore) ApplySupplyDelta ¶
func (s *InstrumentedStore) ApplySupplyDelta(ctx context.Context, instrumentAdmin, instrumentID, delta string) error
func (*InstrumentedStore) GetBalance ¶
func (*InstrumentedStore) GetEvent ¶
func (s *InstrumentedStore) GetEvent(ctx context.Context, contractID string) (*indexer.ParsedEvent, error)
func (*InstrumentedStore) InsertEvent ¶
func (s *InstrumentedStore) InsertEvent(ctx context.Context, event *indexer.ParsedEvent) (bool, error)
func (*InstrumentedStore) InsertHolding ¶
func (s *InstrumentedStore) InsertHolding(ctx context.Context, h *indexer.HoldingChange) error
func (*InstrumentedStore) InsertPendingOffer ¶
func (s *InstrumentedStore) InsertPendingOffer(ctx context.Context, offer *indexer.PendingOffer) error
func (*InstrumentedStore) LatestOffset ¶
func (s *InstrumentedStore) LatestOffset(ctx context.Context) (int64, error)
func (*InstrumentedStore) ListAllPendingOffers ¶
func (s *InstrumentedStore) ListAllPendingOffers( ctx context.Context, p indexer.Pagination, ) ([]indexer.PendingOffer, int64, error)
func (*InstrumentedStore) ListBalancesForParty ¶
func (s *InstrumentedStore) ListBalancesForParty( ctx context.Context, partyID string, p indexer.Pagination, ) ([]*indexer.Balance, int64, error)
func (*InstrumentedStore) ListBalancesForToken ¶
func (s *InstrumentedStore) ListBalancesForToken( ctx context.Context, admin, id string, p indexer.Pagination, ) ([]*indexer.Balance, int64, error)
func (*InstrumentedStore) ListEvents ¶
func (s *InstrumentedStore) ListEvents( ctx context.Context, f indexer.EventFilter, p indexer.Pagination, ) ([]*indexer.ParsedEvent, int64, error)
func (*InstrumentedStore) ListPendingOffersForParty ¶
func (s *InstrumentedStore) ListPendingOffersForParty( ctx context.Context, partyID string, p indexer.Pagination, ) ([]indexer.PendingOffer, int64, error)
func (*InstrumentedStore) ListTokens ¶
func (s *InstrumentedStore) ListTokens(ctx context.Context, p indexer.Pagination) ([]*indexer.Token, int64, error)
func (*InstrumentedStore) MarkOfferAccepted ¶
func (s *InstrumentedStore) MarkOfferAccepted(ctx context.Context, contractID string) error
func (*InstrumentedStore) RunInTx ¶
func (s *InstrumentedStore) RunInTx(ctx context.Context, fn func(ctx context.Context, tx engine.Store) error) error
RunInTx wraps the inner RunInTx so that operations executed inside the transaction are also instrumented. The fn receives an instrumentedWriteStore that wraps the transaction-scoped engine.Store directly — no type assertion needed.
func (*InstrumentedStore) SaveOffset ¶
func (s *InstrumentedStore) SaveOffset(ctx context.Context, offset int64) error
func (*InstrumentedStore) TakeHolding ¶
func (s *InstrumentedStore) TakeHolding(ctx context.Context, contractID string) (indexer.HoldingChange, bool, error)
func (*InstrumentedStore) UpsertToken ¶
type OffsetDao ¶
type OffsetDao struct {
bun.BaseModel `bun:"table:indexer_offsets"`
ID int `bun:",pk,default:1"`
LedgerOffset int64 `bun:",notnull,default:0"`
}
OffsetDao maps to the 'indexer_offsets' table. A single row (ID=1) holds the latest persisted ledger offset.
type PGStore ¶
type PGStore struct {
// contains filtered or unexported fields
}
PGStore is a PostgreSQL-backed store for the indexer, using Bun ORM. It satisfies both engine.Store (write path: processor) and service.Store (read path: HTTP).
func (*PGStore) ApplyBalanceDelta ¶
func (s *PGStore) ApplyBalanceDelta(ctx context.Context, partyID, instrumentAdmin, instrumentID, delta string) error
ApplyBalanceDelta adjusts a party's balance by delta (signed decimal string). Manages HolderCount on the parent token:
- increments when balance transitions from zero to positive
- decrements when balance transitions from positive to zero
Must be called within a RunInTx transaction so the three steps are atomic.
func (*PGStore) ApplySupplyDelta ¶
func (s *PGStore) ApplySupplyDelta(ctx context.Context, instrumentAdmin, instrumentID, delta string) error
ApplySupplyDelta adds delta (signed decimal string) to a token's TotalSupply.
func (*PGStore) GetBalance ¶
func (s *PGStore) GetBalance(ctx context.Context, partyID, admin, id string) (*indexer.Balance, error)
GetBalance retrieves a single balance record. Returns nil, nil when not found.
func (*PGStore) GetEvent ¶
GetEvent retrieves a single event by contract ID. Returns nil, nil when not found.
func (*PGStore) GetToken ¶
GetToken retrieves token metadata by composite key. Returns nil, nil when not found.
func (*PGStore) InsertEvent ¶
InsertEvent persists one ParsedEvent. Returns inserted=false when the event already exists (idempotent by ContractID).
func (*PGStore) InsertHolding ¶
InsertHolding records an active Utility.Registry.Holding contract. Idempotent by ContractID — replayed CREATED events return no error.
func (*PGStore) InsertPendingOffer ¶
InsertPendingOffer records a new TransferOffer with status PENDING. Idempotent by ContractID.
func (*PGStore) LatestOffset ¶
LatestOffset returns the last persisted ledger offset, or 0 on a fresh start.
func (*PGStore) ListAllPendingOffers ¶
func (s *PGStore) ListAllPendingOffers( ctx context.Context, p indexer.Pagination, ) ([]indexer.PendingOffer, int64, error)
ListAllPendingOffers returns all PENDING offers across all parties, ordered by ledger_offset ASC, with pagination.
func (*PGStore) ListBalancesForParty ¶
func (s *PGStore) ListBalancesForParty(ctx context.Context, partyID string, p indexer.Pagination) ([]*indexer.Balance, int64, error)
ListBalancesForParty returns a paginated list of all holdings for a given party. The Count and Scan are executed within a single read-only transaction so the total and the page are derived from the same consistent snapshot (see runReadTx).
func (*PGStore) ListBalancesForToken ¶
func (s *PGStore) ListBalancesForToken(ctx context.Context, admin, id string, p indexer.Pagination) ([]*indexer.Balance, int64, error)
ListBalancesForToken returns a paginated list of all holders of a given token. The Count and Scan are executed within a single read-only transaction so the total and the page are derived from the same consistent snapshot (see runReadTx).
func (*PGStore) ListEvents ¶
func (s *PGStore) ListEvents(ctx context.Context, f indexer.EventFilter, p indexer.Pagination) ([]*indexer.ParsedEvent, int64, error)
ListEvents returns a paginated, ledger_offset-ascending list of events. Zero-value EventFilter fields are ignored. The Count and Scan are executed within a single read-only transaction so the total and the page are derived from the same consistent snapshot (see runReadTx).
func (*PGStore) ListPendingOffersForParty ¶
func (s *PGStore) ListPendingOffersForParty( ctx context.Context, partyID string, p indexer.Pagination, ) ([]indexer.PendingOffer, int64, error)
ListPendingOffersForParty returns PENDING offers where receiver = partyID, ordered by ledger_offset ASC, with pagination.
func (*PGStore) ListTokens ¶
func (s *PGStore) ListTokens(ctx context.Context, p indexer.Pagination) ([]*indexer.Token, int64, error)
ListTokens returns a paginated list of all indexed tokens, ordered by first_seen_offset ASC. The Count and Scan are executed within a single read-only transaction so the total and the page are derived from the same consistent snapshot (see runReadTx).
func (*PGStore) MarkOfferAccepted ¶
MarkOfferAccepted sets a TransferOffer's status to ACCEPTED. No-op when not found.
func (*PGStore) RunInTx ¶
func (s *PGStore) RunInTx(ctx context.Context, fn func(ctx context.Context, tx engine.Store) error) error
RunInTx executes fn inside a single database transaction. The Store passed to fn is scoped to that transaction.
func (*PGStore) SaveOffset ¶
SaveOffset upserts the single-row offset record.
func (*PGStore) TakeHolding ¶
func (s *PGStore) TakeHolding(ctx context.Context, contractID string) (h indexer.HoldingChange, ok bool, err error)
TakeHolding deletes the holding row matching contractID and returns its owner/instrument/amount so the caller can apply the symmetric balance delta. Returns ok=false when the row does not exist (replayed ARCHIVED event).
type PendingOfferDao ¶
type PendingOfferDao struct {
bun.BaseModel `bun:"table:indexer_pending_offers"`
ContractID string `bun:",pk,type:varchar(255)"`
Status string `bun:",notnull,type:varchar(20),default:'PENDING'"`
ReceiverPartyID string `bun:",notnull,type:varchar(255)"`
SenderPartyID string `bun:",notnull,type:varchar(255)"`
InstrumentAdmin string `bun:",notnull,type:varchar(255)"`
InstrumentID string `bun:",notnull,type:varchar(255)"`
Amount string `bun:",notnull,type:text"`
LedgerOffset int64 `bun:",notnull"`
CreatedAt time.Time `bun:",notnull"`
}
PendingOfferDao maps to the 'indexer_pending_offers' table. Rows are written on TransferOffer CREATED events and updated (status→ACCEPTED) on ARCHIVED events. Rows are never deleted — the table is a full audit log.
type StoreMetrics ¶
type StoreMetrics struct {
// QueryDuration tracks database query latency partitioned by operation.
QueryDuration *prometheus.HistogramVec
// Errors counts database errors partitioned by operation.
Errors *prometheus.CounterVec
}
StoreMetrics holds Prometheus collectors for the indexer database layer.
func NewNopStoreMetrics ¶
func NewNopStoreMetrics() *StoreMetrics
NewNopStoreMetrics returns a StoreMetrics instance backed by a throwaway registry. Use in tests where metric values are not asserted.
func NewStoreMetrics ¶
func NewStoreMetrics(reg sharedmetrics.NamespacedRegisterer) *StoreMetrics
NewStoreMetrics registers indexer store metrics against the given registerer.
func (*StoreMetrics) IncErrors ¶
func (m *StoreMetrics) IncErrors(op StoreOperation)
IncErrors increments the error counter for the given operation.
func (*StoreMetrics) ObserveQueryDuration ¶
func (m *StoreMetrics) ObserveQueryDuration(op StoreOperation) prometheus.Observer
ObserveQueryDuration returns the observer for the given operation's query duration.
type StoreOperation ¶
type StoreOperation string
StoreOperation identifies a database operation for metrics labeling.
const ( // Write-path operations (processor / engine.Store). OpLatestOffset StoreOperation = "latest_offset" OpInsertEvent StoreOperation = "insert_event" OpSaveOffset StoreOperation = "save_offset" OpUpsertToken StoreOperation = "upsert_token" OpApplyBalanceDelta StoreOperation = "apply_balance_delta" OpApplySupplyDelta StoreOperation = "apply_supply_delta" OpInsertPendingOffer StoreOperation = "insert_pending_offer" OpMarkOfferAccepted StoreOperation = "mark_offer_accepted" OpInsertHolding StoreOperation = "insert_holding" OpTakeHolding StoreOperation = "take_holding" // Read-path operations (HTTP API / service.Store). OpGetToken StoreOperation = "get_token" OpListTokens StoreOperation = "list_tokens" OpGetBalance StoreOperation = "get_balance" OpListBalancesForParty StoreOperation = "list_balances_for_party" OpListBalancesForToken StoreOperation = "list_balances_for_token" OpGetEvent StoreOperation = "get_event" OpListEvents StoreOperation = "list_events" OpListPendingOffersForPty StoreOperation = "list_pending_offers_for_party" OpListAllPendingOffers StoreOperation = "list_all_pending_offers" )
type TokenDao ¶
type TokenDao struct {
bun.BaseModel `bun:"table:indexer_tokens"`
InstrumentAdmin string `bun:",pk,type:varchar(255)"`
InstrumentID string `bun:",pk,type:varchar(255)"`
Issuer string `bun:",notnull,type:varchar(255)"`
TotalSupply string `bun:",notnull,type:text,default:'0'"`
HolderCount int64 `bun:",notnull,default:0"`
FirstSeenOffset int64 `bun:",notnull"`
FirstSeenAt time.Time `bun:",notnull"`
}
TokenDao maps to the 'indexer_tokens' table. The composite key (InstrumentAdmin, InstrumentID) uniquely identifies a token.