custodial

package
v0.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package custodial implements background automation for custodial Canton parties.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AcceptWorker

type AcceptWorker struct {
	// contains filtered or unexported fields
}

AcceptWorker polls the indexer for all pending TransferOffers and automatically accepts them on behalf of registered custodial parties.

It runs as a background goroutine and stops when ctx is canceled. Each poll cycle fetches all PENDING offers in one paginated stream and checks each receiver against an in-memory map of custodial party IDs built from a single ListUsers call. This is O(1 DB round-trip + P indexer pages) per cycle regardless of how many custodial users exist.

A custodial user registered after the ListUsers call at the start of a cycle is caught on the next tick — at most one poll-interval delay.

func NewAcceptWorker

func NewAcceptWorker(
	cantonToken cantontkn.Token,
	userLister UserLister,
	indexerClient indexerclient.Client,
	pollInterval time.Duration,
	metrics *Metrics,
	logger *zap.Logger,
) *AcceptWorker

NewAcceptWorker creates a new AcceptWorker.

metrics receives Prometheus observations for cycle duration, per-phase errors, and per-offer accept outcomes. Pass NewNopMetrics() in tests where metric values aren't asserted.

func (*AcceptWorker) Run

func (w *AcceptWorker) Run(ctx context.Context) error

Run starts the accept worker loop. It blocks until ctx is canceled.

type AcceptWorkerConfig

type AcceptWorkerConfig struct {
	// IndexerURL is the base URL of the indexer service (e.g. "http://localhost:8081").
	// Required when the worker is enabled.
	IndexerURL   string        `yaml:"indexer_url" validate:"required"`
	PollInterval time.Duration `yaml:"poll_interval" default:"10s"`
}

AcceptWorkerConfig configures the background worker that auto-accepts inbound USDCx TransferOffers for custodial parties. Omitting this block disables the worker.

type Metrics

type Metrics struct {
	// RunsTotal counts every invocation of acceptPending — successful, empty,
	// and errored alike. Pair with rate() to check the worker's tick cadence.
	RunsTotal prometheus.Counter

	// RunDuration is the per-cycle wall-clock duration including all paginated
	// indexer fetches and all per-offer accept calls in that cycle.
	RunDuration prometheus.Histogram

	// ErrorsTotal counts cycles that aborted early.
	//   phase=list_users   – ListCustodialUsers failed
	//   phase=fetch_offers – GetAllPendingOffers failed on some page
	// Per-offer accept failures are *not* counted here (see OffersAccepted).
	ErrorsTotal *prometheus.CounterVec

	// CustodialUsers is the last observed count of users returned by
	// ListCustodialUsers — the size of the operating set the worker checks
	// each cycle.
	CustodialUsers prometheus.Gauge

	// PendingOffers is the last observed total count of pending TransferOffers
	// from the indexer (taken from result.Total on the first page). Operational
	// signal: if this grows unbounded the worker is falling behind.
	PendingOffers prometheus.Gauge

	// OffersFetchedTotal is the running count of offers the indexer has
	// returned across all pages and cycles, regardless of receiver. Counts
	// indexer throughput from the worker's perspective.
	OffersFetchedTotal prometheus.Counter

	// OffersAcceptedTotal counts per-offer accept attempts (only offers whose
	// receiver matches a custodial party — others are skipped without
	// incrementing). result ∈ "success" / "error".
	OffersAcceptedTotal *prometheus.CounterVec

	// OfferAcceptDuration is the per-AcceptTransferInstruction call latency.
	// Isolated from RunDuration so dashboards can show Canton RPC cost
	// separately from cycle orchestration cost.
	OfferAcceptDuration prometheus.Histogram

	// LastSuccessfulRunTimestamp is the UNIX timestamp of the most recent
	// cycle that completed without an abort-on-error. Powers a staleness
	// alert: if (time() - this) >> pollInterval, the worker is stuck.
	LastSuccessfulRunTimestamp prometheus.Gauge
}

Metrics holds Prometheus collectors for the AcceptWorker.

The worker's cycle ("acceptPending") has three observable phases:

  1. Listing custodial users (one DB call to UserLister).
  2. Paginating pending TransferOffers from the indexer (N indexer calls).
  3. Accepting each custodial-owned offer (M Canton RPC calls).

Phase-1 and phase-2 failures abort the cycle and are counted in ErrorsTotal with the appropriate phase label. Phase-3 (per-offer) failures are *not* fatal — the worker logs and continues to the next offer — so they live in their own OffersAcceptedTotal{result="error"} series rather than the cycle-level ErrorsTotal.

func NewMetrics

NewMetrics registers AcceptWorker metrics against the given registerer.

func NewNopMetrics

func NewNopMetrics() *Metrics

NewNopMetrics returns a Metrics instance backed by a throwaway registry. Use in tests where metric values are not asserted.

type UserLister

type UserLister interface {
	ListCustodialUsers(ctx context.Context) ([]*user.User, error)
}

UserLister is the narrow user-store interface the AcceptWorker needs.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL