Documentation
¶
Overview ¶
Package custodial implements background automation for custodial Canton parties.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AcceptWorker ¶
type AcceptWorker struct {
// contains filtered or unexported fields
}
AcceptWorker polls the indexer for all pending TransferOffers and automatically accepts them on behalf of registered custodial parties.
It runs as a background goroutine and stops when ctx is canceled. Each poll cycle fetches all PENDING offers in one paginated stream and checks each receiver against an in-memory map of custodial party IDs built from a single ListUsers call. This is O(1 DB round-trip + P indexer pages) per cycle regardless of how many custodial users exist.
A custodial user registered after the ListUsers call at the start of a cycle is caught on the next tick — at most one poll-interval delay.
func NewAcceptWorker ¶
func NewAcceptWorker( cantonToken cantontkn.Token, userLister UserLister, indexerClient indexerclient.Client, pollInterval time.Duration, metrics *Metrics, logger *zap.Logger, ) *AcceptWorker
NewAcceptWorker creates a new AcceptWorker.
metrics receives Prometheus observations for cycle duration, per-phase errors, and per-offer accept outcomes. Pass NewNopMetrics() in tests where metric values aren't asserted.
func (*AcceptWorker) Run ¶
func (w *AcceptWorker) Run(ctx context.Context)
Run starts the accept worker loop. It blocks until ctx is canceled.
type AcceptWorkerConfig ¶
type AcceptWorkerConfig struct {
// IndexerURL is the base URL of the indexer service (e.g. "http://localhost:8081").
// Required when the worker is enabled.
IndexerURL string `yaml:"indexer_url" validate:"required"`
PollInterval time.Duration `yaml:"poll_interval" default:"10s"`
}
AcceptWorkerConfig configures the background worker that auto-accepts inbound USDCx TransferOffers for custodial parties. Omitting this block disables the worker.
type Metrics ¶
type Metrics struct {
// RunsTotal counts every invocation of acceptPending — successful, empty,
// and errored alike. Pair with rate() to check the worker's tick cadence.
RunsTotal prometheus.Counter
// RunDuration is the per-cycle wall-clock duration including all paginated
// indexer fetches and all per-offer accept calls in that cycle.
RunDuration prometheus.Histogram
// ErrorsTotal counts cycles that aborted early.
// phase=list_users – ListCustodialUsers failed
// phase=fetch_offers – GetAllPendingOffers failed on some page
// Per-offer accept failures are *not* counted here (see OffersAccepted).
ErrorsTotal *prometheus.CounterVec
// CustodialUsers is the last observed count of users returned by
// ListCustodialUsers — the size of the operating set the worker checks
// each cycle.
CustodialUsers prometheus.Gauge
// PendingOffers is the last observed total count of pending TransferOffers
// from the indexer (taken from result.Total on the first page). Operational
// signal: if this grows unbounded the worker is falling behind.
PendingOffers prometheus.Gauge
// OffersFetchedTotal is the running count of offers the indexer has
// returned across all pages and cycles, regardless of receiver. Counts
// indexer throughput from the worker's perspective.
OffersFetchedTotal prometheus.Counter
// OffersAcceptedTotal counts per-offer accept attempts (only offers whose
// receiver matches a custodial party — others are skipped without
// incrementing). result ∈ "success" / "error".
OffersAcceptedTotal *prometheus.CounterVec
// OfferAcceptDuration is the per-AcceptTransferInstruction call latency.
// Isolated from RunDuration so dashboards can show Canton RPC cost
// separately from cycle orchestration cost.
OfferAcceptDuration prometheus.Histogram
// LastSuccessfulRunTimestamp is the UNIX timestamp of the most recent
// cycle that completed without an abort-on-error. Powers a staleness
// alert: if (time() - this) >> pollInterval, the worker is stuck.
LastSuccessfulRunTimestamp prometheus.Gauge
}
Metrics holds Prometheus collectors for the AcceptWorker.
The worker's cycle ("acceptPending") has three observable phases:
- Listing custodial users (one DB call to UserLister).
- Paginating pending TransferOffers from the indexer (N indexer calls).
- Accepting each custodial-owned offer (M Canton RPC calls).
Phase-1 and phase-2 failures abort the cycle and are counted in ErrorsTotal with the appropriate phase label. Phase-3 (per-offer) failures are *not* fatal — the worker logs and continues to the next offer — so they live in their own OffersAcceptedTotal{result="error"} series rather than the cycle-level ErrorsTotal.
func NewMetrics ¶
func NewMetrics(reg sharedmetrics.NamespacedRegisterer) *Metrics
NewMetrics registers AcceptWorker metrics against the given registerer.
func NewNopMetrics ¶
func NewNopMetrics() *Metrics
NewNopMetrics returns a Metrics instance backed by a throwaway registry. Use in tests where metric values are not asserted.