engine

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNegativeBalance = errors.New("negative balance")

ErrNegativeBalance is returned by Store.ApplyBalanceDelta when the delta would make the balance negative. Exposed so the processor can distinguish incomplete cross-participant history from actual store errors.

Functions

func NewHoldingDecoder

func NewHoldingDecoder(
	packageID string, logger *zap.Logger,
) func(*streaming.LedgerTransaction, *streaming.LedgerEvent) (*indexer.HoldingChange, bool)

NewHoldingDecoder returns a decode function for Utility.Registry.Holding.V0.Holding CREATED and ARCHIVED events. Returns nil, false when packageID is empty (feature disabled). Used so the indexer can maintain indexer_balances for Utility.Registry instruments (e.g. USDCx) which do not emit a separate TokenTransferEvent contract.

The Holding template's create_arguments are {operator, provider, registrar, owner, instrument{source,id,scheme}, label, amount, lock}. The Splice HoldingV1 view derives instrumentId.admin from `registrar` and instrumentId.id from `instrument.id` — the decoder mirrors that mapping so balances keyed by (admin, id) line up with the per-instrument balance table.

func NewMultiDecoder

NewMultiDecoder wraps the TokenTransfer, Offer, and Holding decoders into a single any-typed decode function. The Holding decoder is only consulted when both prior decoders miss — TokenTransferEvents and TransferOffers never collide with Holding templates, so the order is purely a fast-path optimization for the common case.

func NewOfferDecoder

func NewOfferDecoder(
	packageID string, logger *zap.Logger,
) func(*streaming.LedgerTransaction, *streaming.LedgerEvent) (*indexer.PendingOffer, bool)

NewOfferDecoder returns a decode function for TransferOffer CREATED and ARCHIVED events. Returns nil, false when packageID is empty (feature disabled).

func NewTokenTransferDecoder

func NewTokenTransferDecoder(
	mode indexer.FilterMode,
	allowed []indexer.InstrumentKey,
	logger *zap.Logger,
) func(*streaming.LedgerTransaction, *streaming.LedgerEvent) (*indexer.ParsedEvent, bool)

NewTokenTransferDecoder returns a decode function for use with streaming.NewStream.

The closure:

  • skips archived events
  • checks ModuleName == "CIP56.Events" && TemplateName == "TokenTransferEvent"
  • applies the FilterModeWhitelist instrument check when mode is FilterModeWhitelist
  • extracts all fields into a *ParsedEvent
  • returns nil, false for invalid events (both parties absent, filter miss)

Types

type EventFetcher

type EventFetcher interface {
	// Start begins streaming from offset in a background goroutine.
	// Must be called exactly once before Events is used.
	Start(ctx context.Context, offset int64)

	// Events returns the read-only channel of decoded batches.
	// The channel is closed when the stream terminates.
	Events() <-chan *streaming.Batch[any]
}

EventFetcher is the interface the Processor uses to start and consume the ledger stream.

type Fetcher

type Fetcher struct {
	// contains filtered or unexported fields
}

Fetcher opens a live Canton stream from a caller-supplied resume offset and exposes the resulting batches via Events.

Typical usage:

decode := engine.NewMultiDecoder(transferDecode, offerDecode)
f := engine.NewFetcher(streamClient, templateIDs, decode, logger)
f.Start(ctx, lastProcessedOffset)
for batch := range f.Events() { ... }

func NewFetcher

func NewFetcher(
	streamer streaming.Streamer,
	templateIDs []streaming.TemplateID,
	decode func(*streaming.LedgerTransaction, *streaming.LedgerEvent) (any, bool),
	logger *zap.Logger,
) *Fetcher

NewFetcher creates a new Fetcher.

  • streamer: Canton streaming client (handles reconnection, auth, backoff)
  • templateIDs: DAML templates to subscribe to
  • decode: per-event decode function (see NewMultiDecoder)
  • logger: caller-provided logger

func (*Fetcher) Events

func (f *Fetcher) Events() <-chan *streaming.Batch[any]

Events returns the read-only channel of decoded batches. Must be called after Start. The channel is closed when the stream terminates.

func (*Fetcher) Start

func (f *Fetcher) Start(ctx context.Context, offset int64)

Start begins streaming from offset. It is non-blocking; the underlying goroutine exits when ctx is canceled or the stream closes.

Start must be called exactly once before Events is used. Subsequent calls are no-ops.

type Metrics

type Metrics struct {

	// EventsProcessedTotal counts events successfully indexed, partitioned by
	// event type (MINT, BURN, TRANSFER).
	EventsProcessedTotal *prometheus.CounterVec

	// BatchProcessingErrors counts batch processing failures that triggered a
	// retry. Each increment represents one failed attempt, not one lost batch.
	BatchProcessingErrors prometheus.Counter

	// SyncLagSeconds reports how far behind real-time the indexer is, measured
	// as time.Since(lastEvent.EffectiveTime). Updated after each non-empty batch.
	SyncLagSeconds prometheus.Gauge

	// LastOffset is the ledger offset of the most recently committed batch.
	// Updated after every successful SaveOffset.
	LastOffset prometheus.Gauge
}

Metrics holds Prometheus collectors for the indexer engine. Create with NewMetrics and inject into Processor via NewProcessor.

func NewMetrics

NewMetrics registers indexer engine metrics against the given registerer.

func NewNopMetrics

func NewNopMetrics() *Metrics

NewNopMetrics returns a Metrics instance backed by a throwaway registry. Use in tests where metric values are not asserted.

func (*Metrics) IncEventsProcessed

func (m *Metrics) IncEventsProcessed(eventType indexer.EventType)

IncEventsProcessed increments the events-processed counter for the given type.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor is the main run loop of the indexer. It wires the EventFetcher to the Store and writes decoded events atomically.

Processing is sequential — one batch at a time. The ordering guarantee comes from the Canton ledger: transactions within a party's projection are delivered in strictly increasing offset order.

func NewProcessor

func NewProcessor(fetcher EventFetcher, store Store, metrics *Metrics, logger *zap.Logger) *Processor

NewProcessor creates a Processor.

func (*Processor) Run

func (p *Processor) Run(ctx context.Context) error

Run starts the indexer loop. It blocks until ctx is canceled or the fetcher channel closes, then returns ctx.Err() or nil respectively.

On startup Run loads the resume offset from the store and passes it to the fetcher, so callers do not need to track offsets themselves.

If processBatch fails (store error) Run retries the same batch with exponential backoff (5s → 60s) until it succeeds or ctx is canceled. The offset is never advanced past a failed batch — no event is silently dropped.

type Store

type Store interface {
	// LatestOffset returns the last successfully persisted ledger offset.
	// Returns 0 and no error when no offset has been stored yet (fresh start).
	// Called once at startup, outside any transaction.
	LatestOffset(ctx context.Context) (int64, error)

	// RunInTx executes fn inside a single database transaction.
	// On success fn's return value is nil and the transaction is committed.
	// On any error the transaction is rolled back and the error is returned.
	// The Store passed to fn is scoped to the transaction — all methods on it
	// participate in the same underlying DB transaction.
	RunInTx(ctx context.Context, fn func(ctx context.Context, tx Store) error) error

	// InsertEvent persists one ParsedEvent by ContractID.
	// Returns inserted=false when the event already exists and should therefore not
	// mutate any derived state a second time.
	InsertEvent(ctx context.Context, event *indexer.ParsedEvent) (inserted bool, err error)

	// SaveOffset advances the stored ledger offset after all newly inserted events in
	// the transaction have updated derived state. It must be safe to call even when the
	// batch was empty or every event was already present.
	SaveOffset(ctx context.Context, offset int64) error

	// UpsertToken records a token deployment on first observation.
	// Subsequent calls for the same {InstrumentAdmin, InstrumentID} are no-ops
	// (ON CONFLICT DO NOTHING).
	UpsertToken(ctx context.Context, token *indexer.Token) error

	// ApplyBalanceDelta adjusts a party's token balance by delta (signed decimal string).
	// The balance row is created at zero if it does not yet exist, then delta is added.
	// The store must also update Token.HolderCount atomically:
	//   - increment when a party's balance transitions from zero to positive
	//   - decrement when a party's balance transitions from positive to zero
	ApplyBalanceDelta(ctx context.Context, partyID, instrumentAdmin, instrumentID, delta string) error

	// ApplySupplyDelta adjusts a token's TotalSupply by delta (signed decimal string).
	// Called once per mint (+amount) or burn (-amount). Transfer events must not call this.
	ApplySupplyDelta(ctx context.Context, instrumentAdmin, instrumentID, delta string) error

	// InsertPendingOffer records a new TransferOffer (idempotent by ContractID).
	// Status is set to PENDING on insert.
	InsertPendingOffer(ctx context.Context, offer *indexer.PendingOffer) error

	// MarkOfferAccepted transitions a TransferOffer to ACCEPTED status when the Canton
	// ledger emits an ARCHIVED event for the contract (receiver exercised Accept, or the
	// offer was rejected/expired). The row is kept for audit history; no-op when not found.
	MarkOfferAccepted(ctx context.Context, contractID string) error

	// InsertHolding records an active Utility.Registry.Holding contract so its amount
	// and owner can be recovered when the contract is later archived (archive events
	// carry only the contract ID). Idempotent on ContractID.
	InsertHolding(ctx context.Context, h *indexer.HoldingChange) error

	// TakeHolding deletes the row for contractID and returns the stored owner/
	// instrument/amount needed to decrement the matching balance on archive.
	// Returns ok=false on missing rows so replayed ARCHIVED events become no-ops
	// instead of errors.
	TakeHolding(ctx context.Context, contractID string) (h indexer.HoldingChange, ok bool, err error)
}

Store defines the persistence contract for the indexer Processor.

The key invariant: offset and events from the same LedgerTransaction must be written atomically. This guarantees that after a restart the processor resumes from a consistent point — no event is lost and no event is double-written.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL