Documentation
¶
Index ¶
- Variables
- type Config
- type EVMMultidownloader
- func (dh *EVMMultidownloader) BlockNumber(ctx context.Context, finality aggkittypes.BlockNumberFinality) (uint64, error)
- func (dh *EVMMultidownloader) ChainID(ctx context.Context) (uint64, error)
- func (dh *EVMMultidownloader) CheckValidBlock(ctx context.Context, blockNumber uint64, blockHash common.Hash) (bool, uint64, error)
- func (dh *EVMMultidownloader) EthClient() aggkittypes.BaseEthereumClienter
- func (dh *EVMMultidownloader) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error)
- func (dh *EVMMultidownloader) Finality() aggkittypes.BlockNumberFinality
- func (dh *EVMMultidownloader) GetFinalizedBlockNumber(ctx context.Context) (uint64, error)
- func (dh *EVMMultidownloader) GetLatestBlockNumber(ctx context.Context) (uint64, error)
- func (dh *EVMMultidownloader) GetRPCServices() []jRPC.Service
- func (dh *EVMMultidownloader) GetReorgedDataByReorgID(ctx context.Context, reorgID uint64) (*mdrtypes.ReorgData, error)
- func (dh *EVMMultidownloader) HeaderByNumber(ctx context.Context, number *aggkittypes.BlockNumberFinality) (*aggkittypes.BlockHeader, error)
- func (dh *EVMMultidownloader) Initialize(ctx context.Context) error
- func (dh *EVMMultidownloader) IsAvailable(query mdrtypes.LogQuery) bool
- func (dh *EVMMultidownloader) IsInitialized() bool
- func (dh *EVMMultidownloader) IsPartiallyAvailable(query mdrtypes.LogQuery) (bool, *mdrtypes.LogQuery)
- func (dh *EVMMultidownloader) LogQuery(ctx context.Context, query mdrtypes.LogQuery) (mdrtypes.LogQueryResponse, error)
- func (dh *EVMMultidownloader) RegisterSyncer(data aggkittypes.SyncerConfig) error
- func (dh *EVMMultidownloader) ShowStatistics(iteration int)
- func (dh *EVMMultidownloader) Start(ctx context.Context) error
- func (dh *EVMMultidownloader) StartStep(ctx context.Context) error
- func (dh *EVMMultidownloader) StepSafe(ctx context.Context) (bool, error)
- func (dh *EVMMultidownloader) StepUnsafe(ctx context.Context) (bool, error)
- func (dh *EVMMultidownloader) Stop(ctx context.Context) error
- func (dh *EVMMultidownloader) StorageHeaderByNumber(ctx context.Context, number *aggkittypes.BlockNumberFinality) (*aggkittypes.BlockHeader, mdrtypes.FinalizedType, error)
- func (dh *EVMMultidownloader) WaitForNewLatestBlocks(ctx context.Context) error
- type EVMMultidownloaderDebug
- type EVMMultidownloaderRPC
- type ReorgPort
- func (r *ReorgPort) GetBlockNumberInRPC(ctx context.Context, blockFinality aggkittypes.BlockNumberFinality) (uint64, error)
- func (r *ReorgPort) GetBlockStorageAndRPC(ctx context.Context, tx dbtypes.Querier, blockNumber uint64) (*mdtypes.CompareBlockHeaders, error)
- func (r *ReorgPort) GetLastBlockNumberInStorage(tx dbtypes.Querier) (uint64, error)
- func (r *ReorgPort) MoveReorgedBlocks(tx dbtypes.Querier, reorgData mdtypes.ReorgData) (uint64, error)
- func (r *ReorgPort) NewTx(ctx context.Context) (dbtypes.Txer, error)
- func (r *ReorgPort) TimeNowUnix() uint64
- type ReorgProcessor
- type State
- func (s *State) Clone() *State
- func (s *State) CompletionPercentage() map[common.Address]float64
- func (s *State) ExtendPendingRange(mapBlocks map[aggkittypes.BlockNumberFinality]uint64, ...) error
- func (s *State) GetAddressesToSyncForBlockNumber(blockNumber uint64) []common.Address
- func (s *State) GetHighestBlockNumberPendingToSync() (uint64, aggkittypes.BlockNumberFinality)
- func (s *State) GetTotalPendingBlockRange() *aggkitcommon.BlockRange
- func (s *State) IsAvailable(query mdrtypes.LogQuery) bool
- func (s *State) IsPartiallyAvailable(query mdrtypes.LogQuery) (bool, *mdrtypes.LogQuery)
- func (s *State) IsSyncFinished() bool
- func (s *State) NextQueryToSync(syncBlockChunkSize uint32, maxBlockNumber uint64, applyMaxBlockNumber bool) (*mdrtypes.LogQuery, error)
- func (s *State) OnNewSyncedLogQuery(logQuery *mdrtypes.LogQuery) error
- func (s *State) String() string
- func (s *State) SyncedSegmentsByContract(addrs []common.Address) []mdrtypes.SyncSegment
- func (s *State) TotalBlocksPendingToSync() uint64
- type Statistics
- func (s *Statistics) ETA(pendingBlocks uint64) time.Duration
- func (s *Statistics) ElapsedSyncing() time.Duration
- func (s *Statistics) FinishDBOperation()
- func (s *Statistics) FinishEthCall(err error, numLogs uint64, numBlocks uint64)
- func (s *Statistics) FinishSyncing()
- func (s *Statistics) LaunchedEthCall()
- func (s *Statistics) Show(logFunc func(format string, args ...interface{}), iteration int)
- func (s *Statistics) StartDBOperation()
- func (s *Statistics) StartSyncing()
Constants ¶
This section is empty.
Variables ¶
var ( // ErrInvalidBlockChunkSize is returned when the block chunk size is invalid ErrInvalidBlockChunkSize = errors.New("MultidownloaderConfig.BlockChunkSize: block chunk size must be greater than 0") // ErrInvalidMaxParallelBlockHeaderRetrieval is returned when the max parallel block header retrieval is invalid ErrInvalidMaxParallelBlockHeaderRetrieval = errors.New("MultidownloaderConfig.MaxParallelBlockHeaderRetrieval:" + " max parallel block header retrieval must be greater than 0") // ErrInvalidWaitPeriodToCheckCatchUp is returned when the wait period to check catch up is invalid ErrInvalidWaitPeriodToCheckCatchUp = errors.New("MultidownloaderConfig.WaitPeriodToCheckCatchUp: " + "wait period to check catch up must be greater than 0") )
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// Enabled indicates if the multidownloader is enabled
Enabled bool
// StoragePath is the path to the storage
StoragePath string
// BlockChunkSize is the number of blocks to query in each FilterLogs call
BlockChunkSize uint32
// MaxParallelBlockHeaderRetrieval is the maximum number of parallel RPC calls to retrieve block headers
MaxParallelBlockHeaderRetrieval int
// BlockFinality is which block to consider final (typically finalizedBlock)
BlockFinality aggkittypes.BlockNumberFinality
// WaitPeriodToCheckCatchUp is the duration to wait before checking again if logs are not yet available
WaitPeriodToCheckCatchUp types.Duration
// PeriodToCheckReorgs is the duration to wait before checking for reorgs
// If is 0 reorgs are checked only when a new block appears
PeriodToCheckReorgs types.Duration
// DeveloperMode enables developer mode features like forcing reorgs
DeveloperMode bool
}
func NewConfigDefault ¶
type EVMMultidownloader ¶
type EVMMultidownloader struct {
// contains filtered or unexported fields
}
func NewEVMMultidownloader ¶
func NewEVMMultidownloader(log aggkitcommon.Logger, cfg Config, name string, ethClient aggkittypes.BaseEthereumClienter, rpcClient aggkittypes.RPCClienter, storageDB mdrtypes.Storager, blockNotifierManager ethermantypes.BlockNotifierManager, reorgProcessor mdrtypes.ReorgProcessor, ) (*EVMMultidownloader, error)
NewEVMMultidownloader creates a new EVM multidownloader instance with proper validation
func (*EVMMultidownloader) BlockNumber ¶
func (dh *EVMMultidownloader) BlockNumber(ctx context.Context, finality aggkittypes.BlockNumberFinality) (uint64, error)
BlockNumber gets the block number for the given finality type
func (*EVMMultidownloader) ChainID ¶
func (dh *EVMMultidownloader) ChainID(ctx context.Context) (uint64, error)
ChainID gets the chain ID directly from ethClient
func (*EVMMultidownloader) CheckValidBlock ¶
func (dh *EVMMultidownloader) CheckValidBlock(ctx context.Context, blockNumber uint64, blockHash common.Hash) (bool, uint64, error)
CheckValidBlock checks if the given blockNumber and blockHash are still valid returns: isValid bool, reorgID uint64, err error
func (*EVMMultidownloader) EthClient ¶
func (dh *EVMMultidownloader) EthClient() aggkittypes.BaseEthereumClienter
EthClient returns the underlying eth client
func (*EVMMultidownloader) FilterLogs ¶
func (dh *EVMMultidownloader) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error)
FilterLogs filters the logs. It gets them from storage or waits until they are available
func (*EVMMultidownloader) Finality ¶
func (dh *EVMMultidownloader) Finality() aggkittypes.BlockNumberFinality
func (*EVMMultidownloader) GetFinalizedBlockNumber ¶
func (dh *EVMMultidownloader) GetFinalizedBlockNumber(ctx context.Context) (uint64, error)
func (*EVMMultidownloader) GetLatestBlockNumber ¶
func (dh *EVMMultidownloader) GetLatestBlockNumber(ctx context.Context) (uint64, error)
func (*EVMMultidownloader) GetRPCServices ¶
func (dh *EVMMultidownloader) GetRPCServices() []jRPC.Service
func (*EVMMultidownloader) GetReorgedDataByReorgID ¶
func (*EVMMultidownloader) HeaderByNumber ¶
func (dh *EVMMultidownloader) HeaderByNumber(ctx context.Context, number *aggkittypes.BlockNumberFinality) (*aggkittypes.BlockHeader, error)
HeaderByNumber gets the block header for the given block number from storage or ethClient
func (*EVMMultidownloader) Initialize ¶
func (dh *EVMMultidownloader) Initialize(ctx context.Context) error
Initialize initializes the multidownloader. At this point all syncers must be registered and it will prepare the pendingSync segments
func (*EVMMultidownloader) IsAvailable ¶
func (dh *EVMMultidownloader) IsAvailable(query mdrtypes.LogQuery) bool
func (*EVMMultidownloader) IsInitialized ¶
func (dh *EVMMultidownloader) IsInitialized() bool
func (*EVMMultidownloader) IsPartiallyAvailable ¶
func (dh *EVMMultidownloader) IsPartiallyAvailable(query mdrtypes.LogQuery) (bool, *mdrtypes.LogQuery)
Check if the given log query is partially available
func (*EVMMultidownloader) LogQuery ¶
func (dh *EVMMultidownloader) LogQuery(ctx context.Context, query mdrtypes.LogQuery) (mdrtypes.LogQueryResponse, error)
func (*EVMMultidownloader) RegisterSyncer ¶
func (dh *EVMMultidownloader) RegisterSyncer(data aggkittypes.SyncerConfig) error
RegisterSyncer registers a new syncer config to the multidownloader. it must be called before initialization or Start
func (*EVMMultidownloader) ShowStatistics ¶
func (dh *EVMMultidownloader) ShowStatistics(iteration int)
func (*EVMMultidownloader) StartStep ¶
func (dh *EVMMultidownloader) StartStep(ctx context.Context) error
func (*EVMMultidownloader) StepSafe ¶
func (dh *EVMMultidownloader) StepSafe(ctx context.Context) (bool, error)
StepSafe performs a safe step syncing logs and block headers from historical data Returns true when syncing is complete, false if more work remains
func (*EVMMultidownloader) StepUnsafe ¶
func (dh *EVMMultidownloader) StepUnsafe(ctx context.Context) (bool, error)
func (*EVMMultidownloader) Stop ¶
func (dh *EVMMultidownloader) Stop(ctx context.Context) error
Stop gracefully stops the multidownloader if it's running
func (*EVMMultidownloader) StorageHeaderByNumber ¶
func (dh *EVMMultidownloader) StorageHeaderByNumber(ctx context.Context, number *aggkittypes.BlockNumberFinality) (*aggkittypes.BlockHeader, mdrtypes.FinalizedType, error)
HeaderByNumber gets the block header for the given block number from storage
func (*EVMMultidownloader) WaitForNewLatestBlocks ¶
func (dh *EVMMultidownloader) WaitForNewLatestBlocks(ctx context.Context) error
type EVMMultidownloaderDebug ¶
type EVMMultidownloaderDebug struct {
// contains filtered or unexported fields
}
func NewEVMMultidownloaderDebug ¶
func NewEVMMultidownloaderDebug() *EVMMultidownloaderDebug
func (*EVMMultidownloaderDebug) ForceReorg ¶
func (dh *EVMMultidownloaderDebug) ForceReorg(mismatchingBlockNumber uint64)
func (*EVMMultidownloaderDebug) GetInjectedStartStepError ¶
func (dh *EVMMultidownloaderDebug) GetInjectedStartStepError() error
type EVMMultidownloaderRPC ¶
type EVMMultidownloaderRPC struct {
// contains filtered or unexported fields
}
func NewEVMMultidownloaderRPC ¶
func NewEVMMultidownloaderRPC( logger aggkitcommon.Logger, downloader *EVMMultidownloader, ) *EVMMultidownloaderRPC
func (*EVMMultidownloaderRPC) Reorg ¶
func (b *EVMMultidownloaderRPC) Reorg(mismatchingBlockNumber uint64) (interface{}, rpc.Error)
func (*EVMMultidownloaderRPC) Status ¶
func (b *EVMMultidownloaderRPC) Status() (interface{}, rpc.Error)
Status returns the status of the L1InfoTreeSync component curl -X POST http://localhost:5576/ "Content-Type: application/json" \ -d '{"method":"multidownloader-l1_status", "params":[], "id":1}'
type ReorgPort ¶
type ReorgPort struct {
// contains filtered or unexported fields
}
func (*ReorgPort) GetBlockNumberInRPC ¶
func (r *ReorgPort) GetBlockNumberInRPC( ctx context.Context, blockFinality aggkittypes.BlockNumberFinality, ) (uint64, error)
func (*ReorgPort) GetBlockStorageAndRPC ¶
func (*ReorgPort) GetLastBlockNumberInStorage ¶
func (*ReorgPort) MoveReorgedBlocks ¶
func (*ReorgPort) TimeNowUnix ¶
type ReorgProcessor ¶
type ReorgProcessor struct {
// contains filtered or unexported fields
}
func NewReorgProcessor ¶
func NewReorgProcessor(log aggkitcommon.Logger, ethClient aggkittypes.BaseEthereumClienter, rpcClient aggkittypes.RPCClienter, storage mdtypes.Storager, developerMode bool) *ReorgProcessor
func (*ReorgProcessor) ProcessReorg ¶
func (rm *ReorgProcessor) ProcessReorg(ctx context.Context, detectedReorgError mdtypes.DetectedReorgError, finalizedBlockTag aggkittypes.BlockNumberFinality) error
After detecting a reorg at detectedReorgError.OffendingBlockNumber, - find affected blocks - store the reorg info in storage params:
- detectedReorgError: the error returned by the reorg detection logic, containing the offending block number and the reason for the reorg detection
- finalizedBlockTag: the block tag to consider as finalized (typically finalizedBlock)
type State ¶
type State struct {
// These are the segments that we have already synced
// when a syncer does a `FilterLogs`, it is used to check what is already synced
Synced mdrtypes.SetSyncSegment
// These are the segments that we need to sync
Pending mdrtypes.SetSyncSegment
}
State represents the current state of the multidownloader, it contains the segments that are already synced and the segments that are pending to be synced
func NewEmptyState ¶
func NewEmptyState() *State
NewEmptyState creates a new State with empty synced and pending segments
func NewState ¶
func NewState(synced *mdrtypes.SetSyncSegment, pending *mdrtypes.SetSyncSegment) *State
NewState creates a new State with the given synced and pending segments
func NewStateFromStorageSyncedBlocks ¶
func NewStateFromStorageSyncedBlocks(storageSynced mdrtypes.SetSyncSegment, totalToSync mdrtypes.SetSyncSegment) (*State, error)
NewStateFromStorageSyncedBlocks creates a new State from the given storage synced blocks and total to sync blocks
func (*State) Clone ¶
Clone creates a deep copy of the State This ensures that modifications to the cloned state don't affect the original
func (*State) CompletionPercentage ¶
func (*State) ExtendPendingRange ¶
func (s *State) ExtendPendingRange( mapBlocks map[aggkittypes.BlockNumberFinality]uint64, syncersConfig *mdrtypes.SetSyncerConfig) error
func (*State) GetAddressesToSyncForBlockNumber ¶
GetAddressesToSyncForBlockNumber returns the list of addresses that have pending segments for the given block number
func (*State) GetHighestBlockNumberPendingToSync ¶
func (s *State) GetHighestBlockNumberPendingToSync() (uint64, aggkittypes.BlockNumberFinality)
GetHighestBlockNumberPendingToSync returns the highest block number that is pending to be synced
func (*State) GetTotalPendingBlockRange ¶
func (s *State) GetTotalPendingBlockRange() *aggkitcommon.BlockRange
GetTotalPendingBlockRange returns the total block range that is pending to be synced
func (*State) IsAvailable ¶
IsAvailable checks if the given LogQuery is fully available in the synced segments
func (*State) IsPartiallyAvailable ¶
IsPartiallyAvailable checks if the given LogQuery is partially available in the synced segments
func (*State) IsSyncFinished ¶
IsSyncFinished returns true if there are no more segments pending to be synced
func (*State) NextQueryToSync ¶
func (s *State) NextQueryToSync(syncBlockChunkSize uint32, maxBlockNumber uint64, applyMaxBlockNumber bool) (*mdrtypes.LogQuery, error)
NextQueryToSync returns the next LogQuery to sync based on the pending segments and the given chunk size
func (*State) OnNewSyncedLogQuery ¶
OnNewSyncedLogQuery updates the state to mark a LogQuery as synced This function is transactional - if either operation fails, the state remains unchanged
func (*State) SyncedSegmentsByContract ¶
func (s *State) SyncedSegmentsByContract(addrs []common.Address) []mdrtypes.SyncSegment
SyncedSegmentsByContract returns the list of synced segments for the given contract addresses
func (*State) TotalBlocksPendingToSync ¶
TotalBlocksPendingToSync returns the total number of blocks that are pending to be synced
type Statistics ¶
type Statistics struct {
// contains filtered or unexported fields
}
func NewStatistics ¶
func NewStatistics() *Statistics
func (*Statistics) ElapsedSyncing ¶
func (s *Statistics) ElapsedSyncing() time.Duration
func (*Statistics) FinishDBOperation ¶
func (s *Statistics) FinishDBOperation()
func (*Statistics) FinishEthCall ¶
func (s *Statistics) FinishEthCall(err error, numLogs uint64, numBlocks uint64)
func (*Statistics) FinishSyncing ¶
func (s *Statistics) FinishSyncing()
func (*Statistics) LaunchedEthCall ¶
func (s *Statistics) LaunchedEthCall()
func (*Statistics) Show ¶
func (s *Statistics) Show(logFunc func(format string, args ...interface{}), iteration int)
func (*Statistics) StartDBOperation ¶
func (s *Statistics) StartDBOperation()
func (*Statistics) StartSyncing ¶
func (s *Statistics) StartSyncing()