Documentation
¶
Index ¶
- type BridgeStore
- type CantonBridgeClient
- type CantonDestination
- type Destination
- type Engine
- type ErrorCategory
- type EthereumBridgeClient
- type EthereumDestination
- type EventType
- type Metrics
- func (m *Metrics) IncErrorsTotal(component string, category ErrorCategory)
- func (m *Metrics) IncEventProcessingErrors(chain string, stage ProcessingStage)
- func (m *Metrics) IncEventsDetected(chain string, eventType EventType)
- func (m *Metrics) IncProcessorRestarts(chain string, reason RestartReason)
- func (m *Metrics) IncReconciliationRuns(result ReconciliationResult)
- func (m *Metrics) IncTransactionsSent(chain string, status TxStatus)
- func (m *Metrics) IncTransferRetries(direction relayer.TransferDirection, outcome RetryOutcome)
- func (m *Metrics) IncTransfersTotal(direction relayer.TransferDirection, result TransferResult)
- func (m *Metrics) ObserveTransferAge(direction relayer.TransferDirection, seconds float64)
- func (m *Metrics) ObserveTransferAmount(direction relayer.TransferDirection, token string, amount float64)
- func (m *Metrics) ObserveTransferDuration(direction relayer.TransferDirection) prometheus.Observer
- func (m *Metrics) SetChainHeadPosition(chain string, position float64)
- func (m *Metrics) SetLastProcessedBlock(chain string, block float64)
- func (m *Metrics) SetPendingTransfers(direction relayer.TransferDirection, count float64)
- type OffsetUpdateFunc
- type PostSubmitHook
- type ProcessingStage
- type Processor
- type ReconciliationResult
- type RestartReason
- type RetryOutcome
- type Source
- type TransferResult
- type TxStatus
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BridgeStore ¶
type BridgeStore interface {
// CreateTransfer inserts a new transfer record. Returns true if the record was newly
// inserted, false if it already existed (idempotent via ON CONFLICT DO NOTHING).
CreateTransfer(ctx context.Context, transfer *relayer.Transfer) (bool, error)
GetTransfer(ctx context.Context, id string) (*relayer.Transfer, error)
// UpdateTransferStatus updates status, destination tx hash, and optionally error message.
UpdateTransferStatus(ctx context.Context, id string, status relayer.TransferStatus, destTxHash *string, errMsg *string) error
// IncrementRetryCount atomically increments the retry_count for a transfer.
IncrementRetryCount(ctx context.Context, id string) error
GetChainState(ctx context.Context, chainID string) (*relayer.ChainState, error)
SetChainState(ctx context.Context, chainID string, blockNumber uint64, offset string) error
GetPendingTransfers(ctx context.Context, direction relayer.TransferDirection) ([]*relayer.Transfer, error)
ListTransfers(ctx context.Context, limit int) ([]*relayer.Transfer, error)
}
BridgeStore defines the narrow data-access interface consumed by the relayer.
type CantonBridgeClient ¶
CantonBridgeClient defines the Canton bridge interactions consumed by Engine.
type CantonDestination ¶
type CantonDestination struct {
// contains filtered or unexported fields
}
CantonDestination implements Destination for the Canton ledger.
func NewCantonDestination ¶
func NewCantonDestination(client canton.Bridge, chainID string, logger *zap.Logger) *CantonDestination
NewCantonDestination creates a new Canton destination.
func (*CantonDestination) GetChainID ¶
func (d *CantonDestination) GetChainID() string
GetChainID returns the chain identifier.
func (*CantonDestination) SubmitTransfer ¶
func (d *CantonDestination) SubmitTransfer(ctx context.Context, event *relayer.Event) (string, bool, error)
SubmitTransfer deposits EVM tokens on Canton (creates and processes a PendingDeposit).
type Destination ¶
type Destination interface {
SubmitTransfer(ctx context.Context, event *relayer.Event) (destTxHash string, skipped bool, err error)
GetChainID() string
}
Destination defines the interface for submitting transfers to a chain. SubmitTransfer returns (destTxHash, skipped, err). skipped=true means the transfer was already processed on the destination chain (idempotent, treat as success).
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine orchestrates the bridge relayer operations.
func NewEngine ¶
func NewEngine( cfg *relayer.Config, cantonClient CantonBridgeClient, ethClient EthereumBridgeClient, store BridgeStore, metrics *Metrics, logger *zap.Logger, ) *Engine
NewEngine creates a new relayer engine.
func (*Engine) IsReady ¶
IsReady returns true once both Canton and Ethereum processors have caught up to head.
type ErrorCategory ¶
type ErrorCategory string
ErrorCategory represents the category of an error in ErrorsTotal.
const (
ErrorCategoryProcessing ErrorCategory = "processing"
)
type EthereumBridgeClient ¶
type EthereumBridgeClient interface {
GetLatestBlockNumber(ctx context.Context) (uint64, error)
WithdrawFromCanton(
ctx context.Context,
token common.Address,
recipient common.Address,
amount *big.Int,
nonce *big.Int,
cantonTxHash [32]byte,
) (common.Hash, error)
WatchDepositEvents(ctx context.Context, fromBlock uint64, handler func(*ethereum.DepositEvent) error) error
IsWithdrawalProcessed(ctx context.Context, cantonTxHash [32]byte) (bool, error)
GetLastScannedBlock() uint64
}
EthereumBridgeClient defines the interface for Ethereum interactions.
type EthereumDestination ¶
type EthereumDestination struct {
// contains filtered or unexported fields
}
EthereumDestination implements Destination for Ethereum.
func NewEthereumDestination ¶
func NewEthereumDestination(client EthereumBridgeClient, chainID string, logger *zap.Logger) *EthereumDestination
NewEthereumDestination creates a new Ethereum destination.
func (*EthereumDestination) GetChainID ¶
func (d *EthereumDestination) GetChainID() string
GetChainID returns the chain identifier.
func (*EthereumDestination) SubmitTransfer ¶
func (d *EthereumDestination) SubmitTransfer(ctx context.Context, event *relayer.Event) (string, bool, error)
SubmitTransfer releases tokens on Ethereum for a Canton withdrawal event.
type Metrics ¶
type Metrics struct {
// TransfersTotal counts completed/failed transfers by direction and status.
TransfersTotal *prometheus.CounterVec
// TransferDuration tracks transfer processing time by direction.
TransferDuration *prometheus.HistogramVec
// TransferAmount observes token amounts per transfer.
TransferAmount *prometheus.HistogramVec
// TransactionsSent counts transactions sent to each chain by status.
TransactionsSent *prometheus.CounterVec
// EventsDetected counts events detected on each chain by event type.
EventsDetected *prometheus.CounterVec
// LastProcessedBlock tracks the last processed block number per chain.
LastProcessedBlock *prometheus.GaugeVec
// ProcessorRestarts counts processor restart events by chain and reason.
ProcessorRestarts *prometheus.CounterVec
// EventProcessingErrors counts errors at each processing stage.
EventProcessingErrors *prometheus.CounterVec
// ErrorsTotal counts errors by component and error type.
ErrorsTotal *prometheus.CounterVec
// PendingTransfers tracks number of pending transfers by direction (gauge).
PendingTransfers *prometheus.GaugeVec
// ReconciliationDuration tracks how long each reconciliation run takes.
ReconciliationDuration prometheus.Histogram
// ReconciliationRuns counts reconciliation runs by result.
ReconciliationRuns *prometheus.CounterVec
// TransferRetries counts transfer retry attempts by direction and outcome.
TransferRetries *prometheus.CounterVec
// TransferAge tracks the full lifecycle duration of a transfer (created → completed).
// TODO: Use source chain block timestamp instead of event detection time for true
// on-chain age. Requires adding SourceTimestamp to relayer.Event, fetching block
// headers in ethereumSource, and extracting RecordTime in cantonSource.
TransferAge *prometheus.HistogramVec
// ChainHeadPosition tracks the remote chain head for each chain
// (block number for Ethereum, ledger offset for Canton).
ChainHeadPosition *prometheus.GaugeVec
// Ready indicates whether the relayer engine is fully synced (1=yes, 0=no).
Ready prometheus.Gauge
}
Metrics holds all Prometheus collectors for the relayer engine. Create with NewMetrics and inject into Engine / Processor / Source via constructors.
func NewMetrics ¶
func NewMetrics(reg sharedmetrics.NamespacedRegisterer) *Metrics
NewMetrics registers relayer engine metrics against the given registerer.
func NewNopMetrics ¶
func NewNopMetrics() *Metrics
NewNopMetrics returns a Metrics instance backed by a throwaway registry. Use in tests where metric values are not asserted.
func (*Metrics) IncErrorsTotal ¶
func (m *Metrics) IncErrorsTotal(component string, category ErrorCategory)
IncErrorsTotal increments the error counter for the given component and error category.
func (*Metrics) IncEventProcessingErrors ¶
func (m *Metrics) IncEventProcessingErrors(chain string, stage ProcessingStage)
IncEventProcessingErrors increments the processing error counter for the given chain and stage.
func (*Metrics) IncEventsDetected ¶
IncEventsDetected increments the event counter for the given chain and event type.
func (*Metrics) IncProcessorRestarts ¶
func (m *Metrics) IncProcessorRestarts(chain string, reason RestartReason)
IncProcessorRestarts increments the processor restart counter for the given chain and reason.
func (*Metrics) IncReconciliationRuns ¶
func (m *Metrics) IncReconciliationRuns(result ReconciliationResult)
IncReconciliationRuns increments the reconciliation run counter for the given result.
func (*Metrics) IncTransactionsSent ¶
IncTransactionsSent increments the transaction counter for the given chain and status.
func (*Metrics) IncTransferRetries ¶
func (m *Metrics) IncTransferRetries(direction relayer.TransferDirection, outcome RetryOutcome)
IncTransferRetries increments the retry counter for the given direction and outcome.
func (*Metrics) IncTransfersTotal ¶
func (m *Metrics) IncTransfersTotal(direction relayer.TransferDirection, result TransferResult)
IncTransfersTotal increments the transfer counter for the given direction and result.
func (*Metrics) ObserveTransferAge ¶
func (m *Metrics) ObserveTransferAge(direction relayer.TransferDirection, seconds float64)
ObserveTransferAge records a transfer's lifecycle duration for the given direction.
func (*Metrics) ObserveTransferAmount ¶
func (m *Metrics) ObserveTransferAmount(direction relayer.TransferDirection, token string, amount float64)
ObserveTransferAmount records a transfer amount for the given direction and token.
func (*Metrics) ObserveTransferDuration ¶
func (m *Metrics) ObserveTransferDuration(direction relayer.TransferDirection) prometheus.Observer
ObserveTransferDuration returns the observer for transfer duration by direction.
func (*Metrics) SetChainHeadPosition ¶
SetChainHeadPosition sets the chain head position gauge for the given chain.
func (*Metrics) SetLastProcessedBlock ¶
SetLastProcessedBlock sets the last processed block gauge for the given chain.
func (*Metrics) SetPendingTransfers ¶
func (m *Metrics) SetPendingTransfers(direction relayer.TransferDirection, count float64)
SetPendingTransfers sets the pending transfer gauge for the given direction.
type OffsetUpdateFunc ¶
OffsetUpdateFunc is called after successfully processing an event to persist the offset.
type PostSubmitHook ¶
PostSubmitHook is called after a transfer is successfully submitted to the destination. It is best-effort: errors are logged but do not fail the transfer.
type ProcessingStage ¶
type ProcessingStage string
ProcessingStage represents the stage at which a processing error occurred.
const ( StageCreateTransfer ProcessingStage = "create_transfer" StageSubmit ProcessingStage = "submit" StagePostSubmitHook ProcessingStage = "post_submit_hook" )
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor orchestrates the transfer process from Source to Destination.
func NewProcessor ¶
func NewProcessor( source Source, destination Destination, store BridgeStore, metrics *Metrics, logger *zap.Logger, metricsName string, direction relayer.TransferDirection, ) *Processor
NewProcessor creates a new transfer processor.
func (*Processor) Start ¶
Start starts the processor, streaming events from startOffset until ctx is canceled.
func (*Processor) WithOffsetUpdate ¶
func (p *Processor) WithOffsetUpdate(fn OffsetUpdateFunc) *Processor
WithOffsetUpdate sets the callback for persisting offsets after event processing.
func (*Processor) WithPostSubmit ¶
func (p *Processor) WithPostSubmit(fn PostSubmitHook) *Processor
WithPostSubmit sets a best-effort hook called after each successful transfer submission.
type ReconciliationResult ¶
type ReconciliationResult string
ReconciliationResult represents the result of a reconciliation run.
const ( ReconciliationSuccess ReconciliationResult = "success" ReconciliationError ReconciliationResult = "error" )
type RestartReason ¶
type RestartReason string
RestartReason represents the reason a processor was restarted.
const ( RestartReasonError RestartReason = "error" RestartReasonStreamClosed RestartReason = "stream_closed" )
type RetryOutcome ¶
type RetryOutcome string
RetryOutcome represents the result of a transfer retry attempt.
const ( RetryOutcomeSuccess RetryOutcome = "success" RetryOutcomeFailed RetryOutcome = "failed" RetryOutcomeMaxExceeded RetryOutcome = "max_exceeded" )
type Source ¶
type Source interface {
StreamEvents(ctx context.Context, offset string) (<-chan *relayer.Event, <-chan error)
GetChainID() string
// ExtractOffset returns the offset string to persist after processing event.
// Returns "" when no offset should be saved for this event.
ExtractOffset(event *relayer.Event) string
}
Source defines the interface for streaming events from a chain.
func NewCantonSource ¶
NewCantonSource creates a new Canton event source.
func NewEthereumSource ¶
func NewEthereumSource(client EthereumBridgeClient, chainID string, metrics *Metrics) Source
NewEthereumSource creates a new Ethereum event source.
type TransferResult ¶
type TransferResult string
TransferResult represents the final status of a completed transfer.
const (
TransferResultCompleted TransferResult = "completed"
)