Documentation
¶
Overview ¶
Package submitter drains pending mempool entries by submitting the corresponding ERC-20 transfer to Canton. It is the asynchronous counterpart to ethrpc.service.SendRawTransaction: that handler records intent and returns the tx hash immediately, and this worker transitions each entry to completed (Canton accepted the transfer) or failed (Canton rejected it). The miner then seals the terminal entry into a synthetic EVM block, so eth_getTransactionReceipt returns a status=0x1 / 0x0 receipt as usual.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Metrics ¶
type Metrics struct {
// DrainDuration tracks the wall-clock duration of one drain tick, including
// empty ticks. Successful and errored ticks both contribute so the
// histogram reflects the loop's actual cadence.
DrainDuration prometheus.Histogram
// DrainErrorsTotal counts drain ticks that aborted because the store fetch
// for pending entries failed. Successful and empty ticks are not counted.
DrainErrorsTotal prometheus.Counter
// EntriesFetched is the distribution of pending-entry counts returned by
// one drain tick. Compare against the configured batch size to detect a
// backlog that's pinned at the cap.
EntriesFetched prometheus.Histogram
// PendingBacklog is the last observed count of pending mempool entries
// returned by the store. Updated each drain after the fetch; reflects the
// queue depth at sample time, not a running total.
PendingBacklog prometheus.Gauge
// EntriesProcessedTotal counts terminal per-entry outcomes:
// completed – Canton accepted, mempool row → completed
// failed_permanent – permanent (client-side) error, mempool row → failed
// transient_retry – transient error, row left pending for next tick
EntriesProcessedTotal *prometheus.CounterVec
// EntryProcessDuration is per-entry total processing time, labeled by the
// same outcome as EntriesProcessedTotal. Includes the Canton call plus the
// surrounding orchestration and any follow-up DB write.
EntryProcessDuration *prometheus.HistogramVec
// CantonTransferDuration isolates the Canton TransferFrom RPC latency from
// the rest of process(). Buckets extend to cantonCallTimeout (60s) so
// budget consumption is visible at the high end.
CantonTransferDuration *prometheus.HistogramVec
// CantonTransferErrorsTotal counts Canton call errors classified by the
// apperr.Category taxonomy, plus a "timeout" / "transient" fallback for
// uncategorised cases. Mirrors the labels used elsewhere in
// canton-middleware so dashboards stay consistent.
CantonTransferErrorsTotal *prometheus.CounterVec
// DBWriteErrorsTotal counts post-Canton store writes that failed.
// kind=complete – CompleteMempoolEntry failed
// kind=fail – FailMempoolEntry failed
// Both leave the entry in an inconsistent state; this surfaces what the
// existing log.Error sites do not.
DBWriteErrorsTotal *prometheus.CounterVec
// WorkerPoolBlockedTotal counts the times the drain loop had to block on a
// saturated worker pool before launching the next goroutine. Non-zero
// means concurrency is undersized for the arrival rate.
WorkerPoolBlockedTotal prometheus.Counter
// LastSuccessfulDrainTimestamp is the UNIX timestamp of the most recent
// drain tick that completed without a store error. Powers a staleness
// alert: if (time() - this) > threshold, the submitter is stuck.
LastSuccessfulDrainTimestamp prometheus.Gauge
}
Metrics holds Prometheus collectors for the ethrpc submitter.
func NewMetrics ¶
func NewMetrics(reg sharedmetrics.NamespacedRegisterer) *Metrics
NewMetrics registers ethrpc submitter metrics against the given registerer.
func NewNopMetrics ¶
func NewNopMetrics() *Metrics
NewNopMetrics returns a Metrics instance backed by a throwaway registry. Use in tests where metric values are not asserted.
type Store ¶
type Store interface {
// GetMempoolEntriesByStatus returns up to limit entries with the given
// status, ordered by insertion ID. A limit of 0 means unlimited; the
// submitter passes its batch size so a backlog never loads the entire
// pending queue into memory.
GetMempoolEntriesByStatus(ctx context.Context, status ethrpc.MempoolStatus, limit int) ([]ethrpc.MempoolEntry, error)
CompleteMempoolEntry(ctx context.Context, txHash []byte) error
FailMempoolEntry(ctx context.Context, txHash []byte, errMsg string) error
}
Store is the narrow data-access interface the submitter needs.
type Submitter ¶
type Submitter struct {
// contains filtered or unexported fields
}
Submitter polls pending mempool entries and pushes them through Canton.
func New ¶
func New( store Store, tokenSvc TokenService, interval time.Duration, batchSize, concurrency int, metrics *Metrics, logger *zap.Logger, ) *Submitter
New creates a Submitter.
- interval is the tick spacing between drains.
- batchSize caps how many pending entries are fetched per tick (0 = unlimited). Bounded so a backlog never loads the entire pending queue into memory; the next tick picks up whatever is left.
- concurrency is the worker-pool width: how many Canton transfers run in parallel within one tick (<= 0 defaults to defaultConcurrency so a misconfiguration never silently disables the worker).
- metrics receives Prometheus observations from the drain and per-entry paths. Tests can pass NewNopMetrics() to ignore them.
The per-call Canton timeout (cantonCallTimeout) is fixed at package level — it's a property of the Canton SLO, not a per-deployment knob.