Documentation
¶
Index ¶
- Variables
- func NewHoldingDecoder(packageID string, logger *zap.Logger) ...
- func NewMultiDecoder(...) func(*streaming.LedgerTransaction, *streaming.LedgerEvent) (any, bool)
- func NewOfferDecoder(packageID string, logger *zap.Logger) ...
- func NewTokenTransferDecoder(mode indexer.FilterMode, allowed []indexer.InstrumentKey, logger *zap.Logger) ...
- type EventFetcher
- type Fetcher
- type Metrics
- type Processor
- type Store
Constants ¶
This section is empty.
Variables ¶
var ErrNegativeBalance = errors.New("negative balance")
ErrNegativeBalance is returned by Store.ApplyBalanceDelta when the delta would make the balance negative. Exposed so the processor can distinguish incomplete cross-participant history from actual store errors.
Functions ¶
func NewHoldingDecoder ¶
func NewHoldingDecoder( packageID string, logger *zap.Logger, ) func(*streaming.LedgerTransaction, *streaming.LedgerEvent) (*indexer.HoldingChange, bool)
NewHoldingDecoder returns a decode function for Utility.Registry.Holding.V0.Holding CREATED and ARCHIVED events. Returns nil, false when packageID is empty (feature disabled). Used so the indexer can maintain indexer_balances for Utility.Registry instruments (e.g. USDCx) which do not emit a separate TokenTransferEvent contract.
The Holding template's create_arguments are {operator, provider, registrar, owner, instrument{source,id,scheme}, label, amount, lock}. The Splice HoldingV1 view derives instrumentId.admin from `registrar` and instrumentId.id from `instrument.id` — the decoder mirrors that mapping so balances keyed by (admin, id) line up with the per-instrument balance table.
func NewMultiDecoder ¶
func NewMultiDecoder( transferDecode func(*streaming.LedgerTransaction, *streaming.LedgerEvent) (*indexer.ParsedEvent, bool), offerDecode func(*streaming.LedgerTransaction, *streaming.LedgerEvent) (*indexer.PendingOffer, bool), holdingDecode func(*streaming.LedgerTransaction, *streaming.LedgerEvent) (*indexer.HoldingChange, bool), ) func(*streaming.LedgerTransaction, *streaming.LedgerEvent) (any, bool)
NewMultiDecoder wraps the TokenTransfer, Offer, and Holding decoders into a single any-typed decode function. The Holding decoder is only consulted when both prior decoders miss — TokenTransferEvents and TransferOffers never collide with Holding templates, so the order is purely a fast-path optimization for the common case.
func NewOfferDecoder ¶
func NewOfferDecoder( packageID string, logger *zap.Logger, ) func(*streaming.LedgerTransaction, *streaming.LedgerEvent) (*indexer.PendingOffer, bool)
NewOfferDecoder returns a decode function for TransferOffer CREATED and ARCHIVED events. Returns nil, false when packageID is empty (feature disabled).
func NewTokenTransferDecoder ¶
func NewTokenTransferDecoder( mode indexer.FilterMode, allowed []indexer.InstrumentKey, logger *zap.Logger, ) func(*streaming.LedgerTransaction, *streaming.LedgerEvent) (*indexer.ParsedEvent, bool)
NewTokenTransferDecoder returns a decode function for use with streaming.NewStream.
The closure:
- skips archived events
- checks ModuleName == "CIP56.Events" && TemplateName == "TokenTransferEvent"
- applies the FilterModeWhitelist instrument check when mode is FilterModeWhitelist
- extracts all fields into a *ParsedEvent
- returns nil, false for invalid events (both parties absent, filter miss)
Types ¶
type EventFetcher ¶
type EventFetcher interface {
// Start begins streaming from offset in a background goroutine.
// Must be called exactly once before Events is used.
Start(ctx context.Context, offset int64)
// Events returns the read-only channel of decoded batches.
// The channel is closed when the stream terminates.
Events() <-chan *streaming.Batch[any]
}
EventFetcher is the interface the Processor uses to start and consume the ledger stream.
type Fetcher ¶
type Fetcher struct {
// contains filtered or unexported fields
}
Fetcher opens a live Canton stream from a caller-supplied resume offset and exposes the resulting batches via Events.
Typical usage:
decode := engine.NewMultiDecoder(transferDecode, offerDecode)
f := engine.NewFetcher(streamClient, templateIDs, decode, logger)
f.Start(ctx, lastProcessedOffset)
for batch := range f.Events() { ... }
func NewFetcher ¶
func NewFetcher( streamer streaming.Streamer, templateIDs []streaming.TemplateID, decode func(*streaming.LedgerTransaction, *streaming.LedgerEvent) (any, bool), logger *zap.Logger, ) *Fetcher
NewFetcher creates a new Fetcher.
- streamer: Canton streaming client (handles reconnection, auth, backoff)
- templateIDs: DAML templates to subscribe to
- decode: per-event decode function (see NewMultiDecoder)
- logger: caller-provided logger
type Metrics ¶
type Metrics struct {
// EventsProcessedTotal counts events successfully indexed, partitioned by
// event type (MINT, BURN, TRANSFER).
EventsProcessedTotal *prometheus.CounterVec
// BatchProcessingErrors counts batch processing failures that triggered a
// retry. Each increment represents one failed attempt, not one lost batch.
BatchProcessingErrors prometheus.Counter
// SyncLagSeconds reports how far behind real-time the indexer is, measured
// as time.Since(lastEvent.EffectiveTime). Updated after each non-empty batch.
SyncLagSeconds prometheus.Gauge
// LastOffset is the ledger offset of the most recently committed batch.
// Updated after every successful SaveOffset.
LastOffset prometheus.Gauge
}
Metrics holds Prometheus collectors for the indexer engine. Create with NewMetrics and inject into Processor via NewProcessor.
func NewMetrics ¶
func NewMetrics(reg sharedmetrics.NamespacedRegisterer) *Metrics
NewMetrics registers indexer engine metrics against the given registerer.
func NewNopMetrics ¶
func NewNopMetrics() *Metrics
NewNopMetrics returns a Metrics instance backed by a throwaway registry. Use in tests where metric values are not asserted.
func (*Metrics) IncEventsProcessed ¶
IncEventsProcessed increments the events-processed counter for the given type.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor is the main run loop of the indexer. It wires the EventFetcher to the Store and writes decoded events atomically.
Processing is sequential — one batch at a time. The ordering guarantee comes from the Canton ledger: transactions within a party's projection are delivered in strictly increasing offset order.
func NewProcessor ¶
func NewProcessor(fetcher EventFetcher, store Store, metrics *Metrics, logger *zap.Logger) *Processor
NewProcessor creates a Processor.
func (*Processor) Run ¶
Run starts the indexer loop. It blocks until ctx is canceled or the fetcher channel closes, then returns ctx.Err() or nil respectively.
On startup Run loads the resume offset from the store and passes it to the fetcher, so callers do not need to track offsets themselves.
If processBatch fails (store error) Run retries the same batch with exponential backoff (5s → 60s) until it succeeds or ctx is canceled. The offset is never advanced past a failed batch — no event is silently dropped.
type Store ¶
type Store interface {
// LatestOffset returns the last successfully persisted ledger offset.
// Returns 0 and no error when no offset has been stored yet (fresh start).
// Called once at startup, outside any transaction.
LatestOffset(ctx context.Context) (int64, error)
// RunInTx executes fn inside a single database transaction.
// On success fn's return value is nil and the transaction is committed.
// On any error the transaction is rolled back and the error is returned.
// The Store passed to fn is scoped to the transaction — all methods on it
// participate in the same underlying DB transaction.
RunInTx(ctx context.Context, fn func(ctx context.Context, tx Store) error) error
// InsertEvent persists one ParsedEvent by ContractID.
// Returns inserted=false when the event already exists and should therefore not
// mutate any derived state a second time.
InsertEvent(ctx context.Context, event *indexer.ParsedEvent) (inserted bool, err error)
// SaveOffset advances the stored ledger offset after all newly inserted events in
// the transaction have updated derived state. It must be safe to call even when the
// batch was empty or every event was already present.
SaveOffset(ctx context.Context, offset int64) error
// UpsertToken records a token deployment on first observation.
// Subsequent calls for the same {InstrumentAdmin, InstrumentID} are no-ops
// (ON CONFLICT DO NOTHING).
UpsertToken(ctx context.Context, token *indexer.Token) error
// ApplyBalanceDelta adjusts a party's token balance by delta (signed decimal string).
// The balance row is created at zero if it does not yet exist, then delta is added.
// The store must also update Token.HolderCount atomically:
// - increment when a party's balance transitions from zero to positive
// - decrement when a party's balance transitions from positive to zero
ApplyBalanceDelta(ctx context.Context, partyID, instrumentAdmin, instrumentID, delta string) error
// ApplySupplyDelta adjusts a token's TotalSupply by delta (signed decimal string).
// Called once per mint (+amount) or burn (-amount). Transfer events must not call this.
ApplySupplyDelta(ctx context.Context, instrumentAdmin, instrumentID, delta string) error
// InsertPendingOffer records a new TransferOffer (idempotent by ContractID).
// Status is set to PENDING on insert.
InsertPendingOffer(ctx context.Context, offer *indexer.PendingOffer) error
// MarkOfferAccepted transitions a TransferOffer to ACCEPTED status when the Canton
// ledger emits an ARCHIVED event for the contract (receiver exercised Accept, or the
// offer was rejected/expired). The row is kept for audit history; no-op when not found.
MarkOfferAccepted(ctx context.Context, contractID string) error
// InsertHolding records an active Utility.Registry.Holding contract so its amount
// and owner can be recovered when the contract is later archived (archive events
// carry only the contract ID). Idempotent on ContractID.
InsertHolding(ctx context.Context, h *indexer.HoldingChange) error
// TakeHolding deletes the row for contractID and returns the stored owner/
// instrument/amount needed to decrement the matching balance on archive.
// Returns ok=false on missing rows so replayed ARCHIVED events become no-ops
// instead of errors.
TakeHolding(ctx context.Context, contractID string) (h indexer.HoldingChange, ok bool, err error)
}
Store defines the persistence contract for the indexer Processor.
The key invariant: offset and events from the same LedgerTransaction must be written atomically. This guarantees that after a restart the processor resumes from a consistent point — no event is lost and no event is double-written.