multidownloader

package
v0.10.0-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2026 License: Apache-2.0, MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidBlockChunkSize is returned when the block chunk size is invalid
	ErrInvalidBlockChunkSize = errors.New("MultidownloaderConfig.BlockChunkSize: block chunk size must be greater than 0")
	// ErrInvalidMaxParallelBlockHeaderRetrieval is returned when the max parallel block header retrieval is invalid
	ErrInvalidMaxParallelBlockHeaderRetrieval = errors.New("MultidownloaderConfig.MaxParallelBlockHeaderRetrieval:" +
		" max parallel block header retrieval must be greater than 0")
	// ErrInvalidWaitPeriodToCheckCatchUp is returned when the wait period to check catch up is invalid
	ErrInvalidWaitPeriodToCheckCatchUp = errors.New("MultidownloaderConfig.WaitPeriodToCheckCatchUp: " +
		"wait period to check catch up must be greater than 0")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	// Enabled indicates if the multidownloader is enabled
	Enabled bool
	// StoragePath is the path to the storage
	StoragePath string
	// BlockChunkSize is the number of blocks to query in each FilterLogs call
	BlockChunkSize uint32
	// MaxParallelBlockHeaderRetrieval is the maximum number of parallel RPC calls to retrieve block headers
	MaxParallelBlockHeaderRetrieval int
	// BlockFinality is which block to consider final (typically finalizedBlock)
	BlockFinality aggkittypes.BlockNumberFinality
	// WaitPeriodToCheckCatchUp is the duration to wait before checking again if logs are not yet available
	WaitPeriodToCheckCatchUp types.Duration
	// PeriodToCheckReorgs is the duration to wait before checking for reorgs
	// If is 0 reorgs are checked only when a new block appears
	PeriodToCheckReorgs types.Duration
	// DeveloperMode enables developer mode features like forcing reorgs
	DeveloperMode bool
}

func NewConfigDefault

func NewConfigDefault(name string, basePathDB string) Config

func (*Config) String

func (cfg *Config) String() string

func (*Config) Validate

func (cfg *Config) Validate() error

type EVMMultidownloader

type EVMMultidownloader struct {
	// contains filtered or unexported fields
}

func NewEVMMultidownloader

func NewEVMMultidownloader(log aggkitcommon.Logger,
	cfg Config,
	name string,
	ethClient aggkittypes.BaseEthereumClienter,
	rpcClient aggkittypes.RPCClienter,
	storageDB mdrtypes.Storager,
	blockNotifierManager ethermantypes.BlockNotifierManager,
	reorgProcessor mdrtypes.ReorgProcessor,
) (*EVMMultidownloader, error)

NewEVMMultidownloader creates a new EVM multidownloader instance with proper validation

func (*EVMMultidownloader) BlockNumber

func (dh *EVMMultidownloader) BlockNumber(ctx context.Context,
	finality aggkittypes.BlockNumberFinality) (uint64, error)

BlockNumber gets the block number for the given finality type

func (*EVMMultidownloader) ChainID

func (dh *EVMMultidownloader) ChainID(ctx context.Context) (uint64, error)

ChainID gets the chain ID directly from ethClient

func (*EVMMultidownloader) CheckValidBlock

func (dh *EVMMultidownloader) CheckValidBlock(ctx context.Context, blockNumber uint64,
	blockHash common.Hash) (bool, uint64, error)

CheckValidBlock checks if the given blockNumber and blockHash are still valid returns: isValid bool, reorgID uint64, err error

func (*EVMMultidownloader) EthClient

EthClient returns the underlying eth client

func (*EVMMultidownloader) FilterLogs

func (dh *EVMMultidownloader) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error)

FilterLogs filters the logs. It gets them from storage or waits until they are available

func (*EVMMultidownloader) Finality

func (*EVMMultidownloader) GetFinalizedBlockNumber

func (dh *EVMMultidownloader) GetFinalizedBlockNumber(ctx context.Context) (uint64, error)

func (*EVMMultidownloader) GetLatestBlockNumber

func (dh *EVMMultidownloader) GetLatestBlockNumber(ctx context.Context) (uint64, error)

func (*EVMMultidownloader) GetRPCServices

func (dh *EVMMultidownloader) GetRPCServices() []jRPC.Service

func (*EVMMultidownloader) GetReorgedDataByReorgID

func (dh *EVMMultidownloader) GetReorgedDataByReorgID(ctx context.Context,
	reorgID uint64) (*mdrtypes.ReorgData, error)

func (*EVMMultidownloader) HeaderByNumber

HeaderByNumber gets the block header for the given block number from storage or ethClient

func (*EVMMultidownloader) Initialize

func (dh *EVMMultidownloader) Initialize(ctx context.Context) error

Initialize initializes the multidownloader. At this point all syncers must be registered and it will prepare the pendingSync segments

func (*EVMMultidownloader) IsAvailable

func (dh *EVMMultidownloader) IsAvailable(query mdrtypes.LogQuery) bool

func (*EVMMultidownloader) IsInitialized

func (dh *EVMMultidownloader) IsInitialized() bool

func (*EVMMultidownloader) IsPartiallyAvailable

func (dh *EVMMultidownloader) IsPartiallyAvailable(query mdrtypes.LogQuery) (bool, *mdrtypes.LogQuery)

Check if the given log query is partially available

func (*EVMMultidownloader) LogQuery

func (*EVMMultidownloader) RegisterSyncer

func (dh *EVMMultidownloader) RegisterSyncer(data aggkittypes.SyncerConfig) error

RegisterSyncer registers a new syncer config to the multidownloader. it must be called before initialization or Start

func (*EVMMultidownloader) ShowStatistics

func (dh *EVMMultidownloader) ShowStatistics(iteration int)

func (*EVMMultidownloader) Start

func (dh *EVMMultidownloader) Start(ctx context.Context) error

func (*EVMMultidownloader) StartStep

func (dh *EVMMultidownloader) StartStep(ctx context.Context) error

func (*EVMMultidownloader) StepSafe

func (dh *EVMMultidownloader) StepSafe(ctx context.Context) (bool, error)

StepSafe performs a safe step syncing logs and block headers from historical data Returns true when syncing is complete, false if more work remains

func (*EVMMultidownloader) StepUnsafe

func (dh *EVMMultidownloader) StepUnsafe(ctx context.Context) (bool, error)

func (*EVMMultidownloader) Stop

func (dh *EVMMultidownloader) Stop(ctx context.Context) error

Stop gracefully stops the multidownloader if it's running

func (*EVMMultidownloader) StorageHeaderByNumber

HeaderByNumber gets the block header for the given block number from storage

func (*EVMMultidownloader) WaitForNewLatestBlocks

func (dh *EVMMultidownloader) WaitForNewLatestBlocks(ctx context.Context) error

type EVMMultidownloaderDebug

type EVMMultidownloaderDebug struct {
	// contains filtered or unexported fields
}

func NewEVMMultidownloaderDebug

func NewEVMMultidownloaderDebug() *EVMMultidownloaderDebug

func (*EVMMultidownloaderDebug) ForceReorg

func (dh *EVMMultidownloaderDebug) ForceReorg(mismatchingBlockNumber uint64)

func (*EVMMultidownloaderDebug) GetInjectedStartStepError

func (dh *EVMMultidownloaderDebug) GetInjectedStartStepError() error

type EVMMultidownloaderRPC

type EVMMultidownloaderRPC struct {
	// contains filtered or unexported fields
}

func NewEVMMultidownloaderRPC

func NewEVMMultidownloaderRPC(
	logger aggkitcommon.Logger,
	downloader *EVMMultidownloader,
) *EVMMultidownloaderRPC

func (*EVMMultidownloaderRPC) Reorg

func (b *EVMMultidownloaderRPC) Reorg(mismatchingBlockNumber uint64) (interface{}, rpc.Error)

func (*EVMMultidownloaderRPC) Status

func (b *EVMMultidownloaderRPC) Status() (interface{}, rpc.Error)

Status returns the status of the L1InfoTreeSync component curl -X POST http://localhost:5576/ "Content-Type: application/json" \ -d '{"method":"multidownloader-l1_status", "params":[], "id":1}'

type ReorgPort

type ReorgPort struct {
	// contains filtered or unexported fields
}

func (*ReorgPort) GetBlockNumberInRPC

func (r *ReorgPort) GetBlockNumberInRPC(
	ctx context.Context, blockFinality aggkittypes.BlockNumberFinality,
) (uint64, error)

func (*ReorgPort) GetBlockStorageAndRPC

func (r *ReorgPort) GetBlockStorageAndRPC(ctx context.Context, tx dbtypes.Querier,
	blockNumber uint64) (*mdtypes.CompareBlockHeaders, error)

func (*ReorgPort) GetLastBlockNumberInStorage

func (r *ReorgPort) GetLastBlockNumberInStorage(tx dbtypes.Querier) (uint64, error)

func (*ReorgPort) MoveReorgedBlocks

func (r *ReorgPort) MoveReorgedBlocks(tx dbtypes.Querier, reorgData mdtypes.ReorgData) (uint64, error)

func (*ReorgPort) NewTx

func (r *ReorgPort) NewTx(ctx context.Context) (dbtypes.Txer, error)

func (*ReorgPort) TimeNowUnix

func (r *ReorgPort) TimeNowUnix() uint64

type ReorgProcessor

type ReorgProcessor struct {
	// contains filtered or unexported fields
}

func NewReorgProcessor

func NewReorgProcessor(log aggkitcommon.Logger,
	ethClient aggkittypes.BaseEthereumClienter,
	rpcClient aggkittypes.RPCClienter,
	storage mdtypes.Storager,
	developerMode bool) *ReorgProcessor

func (*ReorgProcessor) ProcessReorg

func (rm *ReorgProcessor) ProcessReorg(ctx context.Context,
	detectedReorgError mdtypes.DetectedReorgError,
	finalizedBlockTag aggkittypes.BlockNumberFinality) error

After detecting a reorg at detectedReorgError.OffendingBlockNumber, - find affected blocks - store the reorg info in storage params:

  • detectedReorgError: the error returned by the reorg detection logic, containing the offending block number and the reason for the reorg detection
  • finalizedBlockTag: the block tag to consider as finalized (typically finalizedBlock)

type State

type State struct {
	// These are the segments that we have already synced
	// when a syncer does a `FilterLogs`, it is used to check what is already synced
	Synced mdrtypes.SetSyncSegment
	// These are the  segments that we need to sync
	Pending mdrtypes.SetSyncSegment
}

State represents the current state of the multidownloader, it contains the segments that are already synced and the segments that are pending to be synced

func NewEmptyState

func NewEmptyState() *State

NewEmptyState creates a new State with empty synced and pending segments

func NewState

func NewState(synced *mdrtypes.SetSyncSegment,
	pending *mdrtypes.SetSyncSegment) *State

NewState creates a new State with the given synced and pending segments

func NewStateFromStorageSyncedBlocks

func NewStateFromStorageSyncedBlocks(storageSynced mdrtypes.SetSyncSegment,
	totalToSync mdrtypes.SetSyncSegment) (*State, error)

NewStateFromStorageSyncedBlocks creates a new State from the given storage synced blocks and total to sync blocks

func (*State) Clone

func (s *State) Clone() *State

Clone creates a deep copy of the State This ensures that modifications to the cloned state don't affect the original

func (*State) CompletionPercentage

func (s *State) CompletionPercentage() map[common.Address]float64

func (*State) ExtendPendingRange

func (s *State) ExtendPendingRange(
	mapBlocks map[aggkittypes.BlockNumberFinality]uint64,
	syncersConfig *mdrtypes.SetSyncerConfig) error

func (*State) GetAddressesToSyncForBlockNumber

func (s *State) GetAddressesToSyncForBlockNumber(blockNumber uint64) []common.Address

GetAddressesToSyncForBlockNumber returns the list of addresses that have pending segments for the given block number

func (*State) GetHighestBlockNumberPendingToSync

func (s *State) GetHighestBlockNumberPendingToSync() (uint64, aggkittypes.BlockNumberFinality)

GetHighestBlockNumberPendingToSync returns the highest block number that is pending to be synced

func (*State) GetTotalPendingBlockRange

func (s *State) GetTotalPendingBlockRange() *aggkitcommon.BlockRange

GetTotalPendingBlockRange returns the total block range that is pending to be synced

func (*State) IsAvailable

func (s *State) IsAvailable(query mdrtypes.LogQuery) bool

IsAvailable checks if the given LogQuery is fully available in the synced segments

func (*State) IsPartiallyAvailable

func (s *State) IsPartiallyAvailable(query mdrtypes.LogQuery) (bool, *mdrtypes.LogQuery)

IsPartiallyAvailable checks if the given LogQuery is partially available in the synced segments

func (*State) IsSyncFinished

func (s *State) IsSyncFinished() bool

IsSyncFinished returns true if there are no more segments pending to be synced

func (*State) NextQueryToSync

func (s *State) NextQueryToSync(syncBlockChunkSize uint32,
	maxBlockNumber uint64, applyMaxBlockNumber bool) (*mdrtypes.LogQuery, error)

NextQueryToSync returns the next LogQuery to sync based on the pending segments and the given chunk size

func (*State) OnNewSyncedLogQuery

func (s *State) OnNewSyncedLogQuery(logQuery *mdrtypes.LogQuery) error

OnNewSyncedLogQuery updates the state to mark a LogQuery as synced This function is transactional - if either operation fails, the state remains unchanged

func (*State) String

func (s *State) String() string

String returns a string representation of the State

func (*State) SyncedSegmentsByContract

func (s *State) SyncedSegmentsByContract(addrs []common.Address) []mdrtypes.SyncSegment

SyncedSegmentsByContract returns the list of synced segments for the given contract addresses

func (*State) TotalBlocksPendingToSync

func (s *State) TotalBlocksPendingToSync() uint64

TotalBlocksPendingToSync returns the total number of blocks that are pending to be synced

type Statistics

type Statistics struct {
	// contains filtered or unexported fields
}

func NewStatistics

func NewStatistics() *Statistics

func (*Statistics) ETA

func (s *Statistics) ETA(pendingBlocks uint64) time.Duration

func (*Statistics) ElapsedSyncing

func (s *Statistics) ElapsedSyncing() time.Duration

func (*Statistics) FinishDBOperation

func (s *Statistics) FinishDBOperation()

func (*Statistics) FinishEthCall

func (s *Statistics) FinishEthCall(err error, numLogs uint64, numBlocks uint64)

func (*Statistics) FinishSyncing

func (s *Statistics) FinishSyncing()

func (*Statistics) LaunchedEthCall

func (s *Statistics) LaunchedEthCall()

func (*Statistics) Show

func (s *Statistics) Show(logFunc func(format string, args ...interface{}), iteration int)

func (*Statistics) StartDBOperation

func (s *Statistics) StartDBOperation()

func (*Statistics) StartSyncing

func (s *Statistics) StartSyncing()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL