engine

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2026 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BridgeStore

type BridgeStore interface {
	// CreateTransfer inserts a new transfer record. Returns true if the record was newly
	// inserted, false if it already existed (idempotent via ON CONFLICT DO NOTHING).
	CreateTransfer(ctx context.Context, transfer *relayer.Transfer) (bool, error)
	GetTransfer(ctx context.Context, id string) (*relayer.Transfer, error)
	// UpdateTransferStatus updates status, destination tx hash, and optionally error message.
	UpdateTransferStatus(ctx context.Context, id string, status relayer.TransferStatus, destTxHash *string, errMsg *string) error
	// IncrementRetryCount atomically increments the retry_count for a transfer.
	IncrementRetryCount(ctx context.Context, id string) error
	GetChainState(ctx context.Context, chainID string) (*relayer.ChainState, error)
	SetChainState(ctx context.Context, chainID string, blockNumber uint64, offset string) error
	GetPendingTransfers(ctx context.Context, direction relayer.TransferDirection) ([]*relayer.Transfer, error)
	ListTransfers(ctx context.Context, limit int) ([]*relayer.Transfer, error)
}

BridgeStore defines the narrow data-access interface consumed by the relayer.

type CantonBridgeClient

type CantonBridgeClient interface {
	canton.Bridge
}

CantonBridgeClient defines the Canton bridge interactions consumed by Engine.

type CantonDestination

type CantonDestination struct {
	// contains filtered or unexported fields
}

CantonDestination implements Destination for the Canton ledger.

func NewCantonDestination

func NewCantonDestination(client canton.Bridge, chainID string, logger *zap.Logger) *CantonDestination

NewCantonDestination creates a new Canton destination.

func (*CantonDestination) GetChainID

func (d *CantonDestination) GetChainID() string

GetChainID returns the chain identifier.

func (*CantonDestination) SubmitTransfer

func (d *CantonDestination) SubmitTransfer(ctx context.Context, event *relayer.Event) (string, bool, error)

SubmitTransfer deposits EVM tokens on Canton (creates and processes a PendingDeposit).

type Destination

type Destination interface {
	SubmitTransfer(ctx context.Context, event *relayer.Event) (destTxHash string, skipped bool, err error)
	GetChainID() string
}

Destination defines the interface for submitting transfers to a chain. SubmitTransfer returns (destTxHash, skipped, err). skipped=true means the transfer was already processed on the destination chain (idempotent, treat as success).

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine orchestrates the bridge relayer operations.

func NewEngine

func NewEngine(
	cfg *relayer.Config,
	cantonClient CantonBridgeClient,
	ethClient EthereumBridgeClient,
	store BridgeStore,
	metrics *Metrics,
	logger *zap.Logger,
) *Engine

NewEngine creates a new relayer engine.

func (*Engine) IsReady

func (e *Engine) IsReady() bool

IsReady returns true once both Canton and Ethereum processors have caught up to head.

func (*Engine) Start

func (e *Engine) Start(ctx context.Context) error

Start starts the relayer engine. It wraps ctx so that Stop() can cancel all goroutines.

func (*Engine) Stop

func (e *Engine) Stop()

Stop cancels all goroutines started by Start and waits for them to finish.

type ErrorCategory

type ErrorCategory string

ErrorCategory represents the category of an error in ErrorsTotal.

const (
	ErrorCategoryProcessing ErrorCategory = "processing"
)

type EthereumBridgeClient

type EthereumBridgeClient interface {
	GetLatestBlockNumber(ctx context.Context) (uint64, error)
	WithdrawFromCanton(
		ctx context.Context,
		token common.Address,
		recipient common.Address,
		amount *big.Int,
		nonce *big.Int,
		cantonTxHash [32]byte,
	) (common.Hash, error)
	WatchDepositEvents(ctx context.Context, fromBlock uint64, handler func(*ethereum.DepositEvent) error) error
	IsWithdrawalProcessed(ctx context.Context, cantonTxHash [32]byte) (bool, error)
	GetLastScannedBlock() uint64
}

EthereumBridgeClient defines the interface for Ethereum interactions.

type EthereumDestination

type EthereumDestination struct {
	// contains filtered or unexported fields
}

EthereumDestination implements Destination for Ethereum.

func NewEthereumDestination

func NewEthereumDestination(client EthereumBridgeClient, chainID string, logger *zap.Logger) *EthereumDestination

NewEthereumDestination creates a new Ethereum destination.

func (*EthereumDestination) GetChainID

func (d *EthereumDestination) GetChainID() string

GetChainID returns the chain identifier.

func (*EthereumDestination) SubmitTransfer

func (d *EthereumDestination) SubmitTransfer(ctx context.Context, event *relayer.Event) (string, bool, error)

SubmitTransfer releases tokens on Ethereum for a Canton withdrawal event.

type EventType

type EventType string

EventType represents the type of a bridge event.

const (
	EventTypeWithdrawal EventType = "withdrawal"
	EventTypeDeposit    EventType = "deposit"
)

type Metrics

type Metrics struct {

	// TransfersTotal counts completed/failed transfers by direction and status.
	TransfersTotal *prometheus.CounterVec

	// TransferDuration tracks transfer processing time by direction.
	TransferDuration *prometheus.HistogramVec

	// TransferAmount observes token amounts per transfer.
	TransferAmount *prometheus.HistogramVec

	// TransactionsSent counts transactions sent to each chain by status.
	TransactionsSent *prometheus.CounterVec

	// EventsDetected counts events detected on each chain by event type.
	EventsDetected *prometheus.CounterVec

	// LastProcessedBlock tracks the last processed block number per chain.
	LastProcessedBlock *prometheus.GaugeVec

	// ProcessorRestarts counts processor restart events by chain and reason.
	ProcessorRestarts *prometheus.CounterVec

	// EventProcessingErrors counts errors at each processing stage.
	EventProcessingErrors *prometheus.CounterVec

	// ErrorsTotal counts errors by component and error type.
	ErrorsTotal *prometheus.CounterVec

	// PendingTransfers tracks number of pending transfers by direction (gauge).
	PendingTransfers *prometheus.GaugeVec

	// ReconciliationDuration tracks how long each reconciliation run takes.
	ReconciliationDuration prometheus.Histogram

	// ReconciliationRuns counts reconciliation runs by result.
	ReconciliationRuns *prometheus.CounterVec

	// TransferRetries counts transfer retry attempts by direction and outcome.
	TransferRetries *prometheus.CounterVec

	// TransferAge tracks the full lifecycle duration of a transfer (created → completed).
	// TODO: Use source chain block timestamp instead of event detection time for true
	// on-chain age. Requires adding SourceTimestamp to relayer.Event, fetching block
	// headers in ethereumSource, and extracting RecordTime in cantonSource.
	TransferAge *prometheus.HistogramVec

	// ChainHeadPosition tracks the remote chain head for each chain
	// (block number for Ethereum, ledger offset for Canton).
	ChainHeadPosition *prometheus.GaugeVec

	// Ready indicates whether the relayer engine is fully synced (1=yes, 0=no).
	Ready prometheus.Gauge
}

Metrics holds all Prometheus collectors for the relayer engine. Create with NewMetrics and inject into Engine / Processor / Source via constructors.

func NewMetrics

NewMetrics registers relayer engine metrics against the given registerer.

func NewNopMetrics

func NewNopMetrics() *Metrics

NewNopMetrics returns a Metrics instance backed by a throwaway registry. Use in tests where metric values are not asserted.

func (*Metrics) IncErrorsTotal

func (m *Metrics) IncErrorsTotal(component string, category ErrorCategory)

IncErrorsTotal increments the error counter for the given component and error category.

func (*Metrics) IncEventProcessingErrors

func (m *Metrics) IncEventProcessingErrors(chain string, stage ProcessingStage)

IncEventProcessingErrors increments the processing error counter for the given chain and stage.

func (*Metrics) IncEventsDetected

func (m *Metrics) IncEventsDetected(chain string, eventType EventType)

IncEventsDetected increments the event counter for the given chain and event type.

func (*Metrics) IncProcessorRestarts

func (m *Metrics) IncProcessorRestarts(chain string, reason RestartReason)

IncProcessorRestarts increments the processor restart counter for the given chain and reason.

func (*Metrics) IncReconciliationRuns

func (m *Metrics) IncReconciliationRuns(result ReconciliationResult)

IncReconciliationRuns increments the reconciliation run counter for the given result.

func (*Metrics) IncTransactionsSent

func (m *Metrics) IncTransactionsSent(chain string, status TxStatus)

IncTransactionsSent increments the transaction counter for the given chain and status.

func (*Metrics) IncTransferRetries

func (m *Metrics) IncTransferRetries(direction relayer.TransferDirection, outcome RetryOutcome)

IncTransferRetries increments the retry counter for the given direction and outcome.

func (*Metrics) IncTransfersTotal

func (m *Metrics) IncTransfersTotal(direction relayer.TransferDirection, result TransferResult)

IncTransfersTotal increments the transfer counter for the given direction and result.

func (*Metrics) ObserveTransferAge

func (m *Metrics) ObserveTransferAge(direction relayer.TransferDirection, seconds float64)

ObserveTransferAge records a transfer's lifecycle duration for the given direction.

func (*Metrics) ObserveTransferAmount

func (m *Metrics) ObserveTransferAmount(direction relayer.TransferDirection, token string, amount float64)

ObserveTransferAmount records a transfer amount for the given direction and token.

func (*Metrics) ObserveTransferDuration

func (m *Metrics) ObserveTransferDuration(direction relayer.TransferDirection) prometheus.Observer

ObserveTransferDuration returns the observer for transfer duration by direction.

func (*Metrics) SetChainHeadPosition

func (m *Metrics) SetChainHeadPosition(chain string, position float64)

SetChainHeadPosition sets the chain head position gauge for the given chain.

func (*Metrics) SetLastProcessedBlock

func (m *Metrics) SetLastProcessedBlock(chain string, block float64)

SetLastProcessedBlock sets the last processed block gauge for the given chain.

func (*Metrics) SetPendingTransfers

func (m *Metrics) SetPendingTransfers(direction relayer.TransferDirection, count float64)

SetPendingTransfers sets the pending transfer gauge for the given direction.

type OffsetUpdateFunc

type OffsetUpdateFunc func(ctx context.Context, chainID string, offset string) error

OffsetUpdateFunc is called after successfully processing an event to persist the offset.

type PostSubmitHook

type PostSubmitHook func(ctx context.Context, event *relayer.Event, destTxHash string) error

PostSubmitHook is called after a transfer is successfully submitted to the destination. It is best-effort: errors are logged but do not fail the transfer.

type ProcessingStage

type ProcessingStage string

ProcessingStage represents the stage at which a processing error occurred.

const (
	StageCreateTransfer ProcessingStage = "create_transfer"
	StageSubmit         ProcessingStage = "submit"
	StagePostSubmitHook ProcessingStage = "post_submit_hook"
)

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor orchestrates the transfer process from Source to Destination.

func NewProcessor

func NewProcessor(
	source Source,
	destination Destination,
	store BridgeStore,
	metrics *Metrics,
	logger *zap.Logger,
	metricsName string,
	direction relayer.TransferDirection,
) *Processor

NewProcessor creates a new transfer processor.

func (*Processor) Start

func (p *Processor) Start(ctx context.Context, startOffset string) error

Start starts the processor, streaming events from startOffset until ctx is canceled.

func (*Processor) WithOffsetUpdate

func (p *Processor) WithOffsetUpdate(fn OffsetUpdateFunc) *Processor

WithOffsetUpdate sets the callback for persisting offsets after event processing.

func (*Processor) WithPostSubmit

func (p *Processor) WithPostSubmit(fn PostSubmitHook) *Processor

WithPostSubmit sets a best-effort hook called after each successful transfer submission.

type ReconciliationResult

type ReconciliationResult string

ReconciliationResult represents the result of a reconciliation run.

const (
	ReconciliationSuccess ReconciliationResult = "success"
	ReconciliationError   ReconciliationResult = "error"
)

type RestartReason

type RestartReason string

RestartReason represents the reason a processor was restarted.

const (
	RestartReasonError        RestartReason = "error"
	RestartReasonStreamClosed RestartReason = "stream_closed"
)

type RetryOutcome

type RetryOutcome string

RetryOutcome represents the result of a transfer retry attempt.

const (
	RetryOutcomeSuccess     RetryOutcome = "success"
	RetryOutcomeFailed      RetryOutcome = "failed"
	RetryOutcomeMaxExceeded RetryOutcome = "max_exceeded"
)

type Source

type Source interface {
	StreamEvents(ctx context.Context, offset string) (<-chan *relayer.Event, <-chan error)
	GetChainID() string
	// ExtractOffset returns the offset string to persist after processing event.
	// Returns "" when no offset should be saved for this event.
	ExtractOffset(event *relayer.Event) string
}

Source defines the interface for streaming events from a chain.

func NewCantonSource

func NewCantonSource(client canton.Bridge, tokenContract, chainID string, metrics *Metrics) Source

NewCantonSource creates a new Canton event source.

func NewEthereumSource

func NewEthereumSource(client EthereumBridgeClient, chainID string, metrics *Metrics) Source

NewEthereumSource creates a new Ethereum event source.

type TransferResult

type TransferResult string

TransferResult represents the final status of a completed transfer.

const (
	TransferResultCompleted TransferResult = "completed"
)

type TxStatus

type TxStatus string

TxStatus represents the outcome status of a transaction or transfer.

const (
	TxStatusSuccess TxStatus = "success"
	TxStatusFailed  TxStatus = "failed"
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL