Documentation
¶
Index ¶
- Variables
- func IsDetectedReorgError(err error) bool
- func IsReorgedError(err error) bool
- type BlockWithLogs
- type CompareBlockHeaders
- type ContractConfig
- type DetectedReorgError
- type FinalizedType
- type Log
- type LogQuery
- type LogQueryResponse
- type ReorgData
- type ReorgDetectionReason
- type ReorgPorter
- type ReorgProcessor
- type ReorgedError
- type SetSyncSegment
- func (s *SetSyncSegment) Add(segment SyncSegment)
- func (f *SetSyncSegment) AddLogQuery(logQuery *LogQuery) error
- func (f *SetSyncSegment) Clone() *SetSyncSegment
- func (f *SetSyncSegment) Empty(segment *SyncSegment)
- func (f *SetSyncSegment) Finished() bool
- func (f *SetSyncSegment) GetAddressesForBlock(blockNumber uint64) []common.Address
- func (f *SetSyncSegment) GetAddressesForBlockRange(blockRange aggkitcommon.BlockRange) []common.Address
- func (s *SetSyncSegment) GetByContract(addr common.Address) (SyncSegment, bool)
- func (s *SetSyncSegment) GetContracts() []common.Address
- func (f *SetSyncSegment) GetHighestBlockNumber() (uint64, aggkittypes.BlockNumberFinality)
- func (f *SetSyncSegment) GetLowestFromBlockSegment() *SyncSegment
- func (s *SetSyncSegment) GetSegments() []SyncSegment
- func (f *SetSyncSegment) GetTargetToBlockTags() []aggkittypes.BlockNumberFinality
- func (f *SetSyncSegment) GetTotalPendingBlockRange() *aggkitcommon.BlockRange
- func (f *SetSyncSegment) IsAvailable(query LogQuery) bool
- func (f *SetSyncSegment) IsPartiallyAvailable(query LogQuery) (bool, *LogQuery)
- func (f *SetSyncSegment) NextQuery(syncBlockChunkSize uint32, maxBlockNumber uint64, applyMaxBlockNumber bool) (*LogQuery, error)
- func (f *SetSyncSegment) Remove(segmentToRemove *SyncSegment)
- func (s *SetSyncSegment) SegmentsByContract(addrs []common.Address) []SyncSegment
- func (s *SetSyncSegment) String() string
- func (f *SetSyncSegment) SubtractLogQuery(logQuery *LogQuery) error
- func (f *SetSyncSegment) SubtractSegments(segments *SetSyncSegment) error
- func (f *SetSyncSegment) TotalBlocks() uint64
- func (f *SetSyncSegment) UpdateBlockRange(segment *SyncSegment, newBlockRange aggkitcommon.BlockRange)
- type SetSyncerConfig
- func (f *SetSyncerConfig) Add(filter aggkittypes.SyncerConfig)
- func (f *SetSyncerConfig) Addresses(blockRange aggkitcommon.BlockRange) []common.Address
- func (f *SetSyncerConfig) Brief() string
- func (f *SetSyncerConfig) ContractConfigs() []ContractConfig
- func (f *SetSyncerConfig) GetTargetToBlockTags() []aggkittypes.BlockNumberFinality
- func (f *SetSyncerConfig) SyncSegments(blockNumbers map[aggkittypes.BlockNumberFinality]uint64) (*SetSyncSegment, error)
- type Storager
- type StoragerForReorg
- type SyncSegment
- func (s *SyncSegment) Clone() *SyncSegment
- func (s *SyncSegment) Empty()
- func (s SyncSegment) Equal(other SyncSegment) bool
- func (s *SyncSegment) IsEmpty() bool
- func (s *SyncSegment) IsValid() bool
- func (s SyncSegment) NewBlockRange(br aggkitcommon.BlockRange) SyncSegment
- func (s *SyncSegment) String() string
- type SyncerID
Constants ¶
This section is empty.
Variables ¶
var (
ErrFinished = fmt.Errorf("no more segments to sync")
)
Functions ¶
func IsDetectedReorgError ¶
IsDetectedReorgError checks if an error is a DetectedReorgError
func IsReorgedError ¶
IsReorgedError checks if an error is a ReorgedError
Types ¶
type BlockWithLogs ¶
type BlockWithLogs struct {
Header aggkittypes.BlockHeader
IsFinal bool
Logs []Log
}
type CompareBlockHeaders ¶
type CompareBlockHeaders struct {
BlockNumber uint64
StorageHeader *aggkittypes.BlockHeader
IsFinalized FinalizedType
RpcHeader *aggkittypes.BlockHeader
}
func (*CompareBlockHeaders) ExistsRPCBlock ¶
func (c *CompareBlockHeaders) ExistsRPCBlock() bool
func (*CompareBlockHeaders) ExistsStorageBlock ¶
func (c *CompareBlockHeaders) ExistsStorageBlock() bool
type ContractConfig ¶
type ContractConfig struct {
Address common.Address
FromBlock uint64
ToBlock aggkittypes.BlockNumberFinality
RequiredBlockHeader bool
Syncers []SyncerID
}
ContractConfig represents the configuration for a specific contract to be synced, the same as SyncerConfig but for individual contracts
func NewContractConfigFromSyncerConfig ¶
func NewContractConfigFromSyncerConfig(address common.Address, syncerConfig aggkittypes.SyncerConfig) *ContractConfig
func (*ContractConfig) Update ¶
func (c *ContractConfig) Update(syncerConfig aggkittypes.SyncerConfig)
type DetectedReorgError ¶
type DetectedReorgError struct {
OffendingBlockNumber uint64 // Important: is not the first reorged block, but one of them
OldHash common.Hash
NewHash common.Hash
ReorgDetectionReason ReorgDetectionReason
Message string
}
DetectedReorgError is an error that is raised when a reorg is detected The block is one of the blocks that were reorged, but not necessarily the first one
func CastDetectedReorgError ¶
func CastDetectedReorgError(err error) *DetectedReorgError
func NewDetectedReorgError ¶
func NewDetectedReorgError(offendingBlockNumber uint64, reason ReorgDetectionReason, oldHash, newHash common.Hash, msg string) *DetectedReorgError
NewDetectedReorgError creates a new DetectedReorgError
func (*DetectedReorgError) Error ¶
func (e *DetectedReorgError) Error() string
type FinalizedType ¶
type FinalizedType = bool
const ( NotFinalized FinalizedType = false Finalized FinalizedType = true )
type Log ¶
type Log struct {
// Consensus fields:
// address of the contract that generated the event
Address common.Address `json:"address" gencodec:"required"`
// list of topics provided by the contract.
Topics []common.Hash `json:"topics" gencodec:"required"`
// supplied by the contract, usually ABI-encoded
Data []byte `json:"data" gencodec:"required"`
// Derived fields. These fields are filled in by the node
// but not secured by consensus.
// block in which the transaction was included
BlockNumber uint64 `json:"blockNumber" rlp:"-"`
// hash of the transaction
TxHash common.Hash `json:"transactionHash" gencodec:"required" rlp:"-"`
// index of the transaction in the block
TxIndex uint `json:"transactionIndex" rlp:"-"`
// timestamp of the block in which the transaction was included
BlockTimestamp uint64 `json:"blockTimestamp" rlp:"-"`
// index of the log in the block
Index uint `json:"logIndex" rlp:"-"`
// The Removed field is true if this log was reverted due to a chain reorganisation.
// You must pay attention to this field if you receive logs through a filter query.
Removed bool `json:"removed" rlp:"-"`
}
type LogQuery ¶
type LogQuery struct {
Addrs []common.Address
BlockRange aggkitcommon.BlockRange
// If BlockHash is set BlockRange contains the corresponding blockNumber
BlockHash *common.Hash
}
LogQuery defines a query for logs
func NewLogQuery ¶
NewLogQuery creates a new LogQuery
func NewLogQueryBlockHash ¶
func NewLogQueryFromEthereumFilter ¶
func NewLogQueryFromEthereumFilter(query ethereum.FilterQuery) LogQuery
NewLogQueryFromEthereumFilter creates a new LogQuery from an Ethereum FilterQuery
func (*LogQuery) IsBlockHashQuery ¶
func (*LogQuery) IsBlockRangeQuery ¶
func (*LogQuery) ToRPCFilterQuery ¶
func (l *LogQuery) ToRPCFilterQuery() ethereum.FilterQuery
ToRPCFilterQuery converts the LogQuery to an Ethereum FilterQuery
type LogQueryResponse ¶
type LogQueryResponse struct {
Blocks []BlockWithLogs
// ResponseRange indicates the block range covered by the response, even if blocks are empty
ResponseRange aggkitcommon.BlockRange
// UnsafeRange indicates the block range that are in unsafe zone (not finalized)
UnsafeRange aggkitcommon.BlockRange
}
func (*LogQueryResponse) CountLogs ¶
func (lqr *LogQueryResponse) CountLogs() int
type ReorgData ¶
type ReorgData struct {
// ReorgID is the unique identifier for the reorg stored in DB (incremental ID)
ReorgID uint64
// BlockRangeAffected is the range of blocks affected by the reorg (from,to inclusive)
BlockRangeAffected aggkitcommon.BlockRange
// DetectedAtBlock is the block number where the reorg was detected
DetectedAtBlock uint64
DetectedTimestamp uint64
NetworkLatestBlock uint64
NetworkFinalizedBlock uint64
NetworkFinalizedBlockName aggkittypes.BlockNumberFinality
Description string
}
type ReorgDetectionReason ¶
type ReorgDetectionReason int
const ( ReorgDetectionReason_BlockHashMismatch ReorgDetectionReason = iota + 1 ReorgDetectionReason_ParentHashMismatch ReorgDetectionReason_MissingBlock // Forced act as MissingBlock but without checking it is basically a debug mode // to produce reorgs scenario (it must have enabled the develMode) ReorgDetectionReason_Forced )
func (ReorgDetectionReason) String ¶
func (r ReorgDetectionReason) String() string
type ReorgPorter ¶
type ReorgPorter interface {
NewTx(ctx context.Context) (dbtypes.Txer, error)
GetBlockStorageAndRPC(ctx context.Context, tx dbtypes.Querier, blockNumber uint64) (*CompareBlockHeaders, error)
GetLastBlockNumberInStorage(tx dbtypes.Querier) (uint64, error)
// Return ChainID of the inserted reorg
MoveReorgedBlocks(tx dbtypes.Querier, reorgData ReorgData) (uint64, error)
GetBlockNumberInRPC(ctx context.Context, blockFinality aggkittypes.BlockNumberFinality) (uint64, error)
TimeNowUnix() uint64
}
type ReorgProcessor ¶
type ReorgProcessor interface {
// ProcessReorg processes a detected reorg starting from the offending block number.
// It identifies the range of blocks affected by the reorg and takes necessary actions
// to handle the reorganization.
// input parameters:
// - ctx: the context for managing cancellation and timeouts
// - detectedReorgError: the error returned by the reorg detection logic, containing
// the offending block number and the reason for the reorg detection
// - finalizedBlockTag: the block tag to consider as finalized (typically finalizedBlock)
ProcessReorg(ctx context.Context, detectedReorgError DetectedReorgError,
finalizedBlockTag aggkittypes.BlockNumberFinality) error
}
type ReorgedError ¶
type ReorgedError struct {
Message string
BlockRangeReorged aggkitcommon.BlockRange
ReorgID uint64
}
func CastReorgedError ¶
func CastReorgedError(err error) *ReorgedError
func NewReorgedError ¶
func NewReorgedError(blockRangeReorged aggkitcommon.BlockRange, reorgID uint64, msg string) *ReorgedError
func (*ReorgedError) Error ¶
func (e *ReorgedError) Error() string
type SetSyncSegment ¶
type SetSyncSegment struct {
// contains filtered or unexported fields
}
func NewSetSyncSegment ¶
func NewSetSyncSegment() SetSyncSegment
NewSetSyncSegment creates a new empty SetSyncSegment
func NewSetSyncSegmentFromLogQuery ¶
func NewSetSyncSegmentFromLogQuery(logQuery *LogQuery) (SetSyncSegment, error)
NewSetSyncSegmentFromLogQuery creates a new SetSyncSegment from a LogQuery
func (*SetSyncSegment) Add ¶
func (s *SetSyncSegment) Add(segment SyncSegment)
Add adds a new SyncSegment to the SetSyncSegment, merging block ranges if the contract address already exists
func (*SetSyncSegment) AddLogQuery ¶
func (f *SetSyncSegment) AddLogQuery(logQuery *LogQuery) error
AddLogQuery adds all segments from the LogQuery to the SetSyncSegment used to update the syncedSegments after a successful FilterLogs
func (*SetSyncSegment) Clone ¶
func (f *SetSyncSegment) Clone() *SetSyncSegment
func (*SetSyncSegment) Empty ¶
func (f *SetSyncSegment) Empty(segment *SyncSegment)
func (*SetSyncSegment) Finished ¶
func (f *SetSyncSegment) Finished() bool
func (*SetSyncSegment) GetAddressesForBlock ¶
func (f *SetSyncSegment) GetAddressesForBlock(blockNumber uint64) []common.Address
func (*SetSyncSegment) GetAddressesForBlockRange ¶
func (f *SetSyncSegment) GetAddressesForBlockRange(blockRange aggkitcommon.BlockRange) []common.Address
func (*SetSyncSegment) GetByContract ¶
func (s *SetSyncSegment) GetByContract(addr common.Address) (SyncSegment, bool)
GetByContract returns the SyncSegment for the given contract address it returns true if it exists, otherwise it returns false
func (*SetSyncSegment) GetContracts ¶
func (s *SetSyncSegment) GetContracts() []common.Address
GetContracts returns the list of contract addresses in the SetSyncSegment
func (*SetSyncSegment) GetHighestBlockNumber ¶
func (f *SetSyncSegment) GetHighestBlockNumber() (uint64, aggkittypes.BlockNumberFinality)
func (*SetSyncSegment) GetLowestFromBlockSegment ¶
func (f *SetSyncSegment) GetLowestFromBlockSegment() *SyncSegment
func (*SetSyncSegment) GetSegments ¶
func (s *SetSyncSegment) GetSegments() []SyncSegment
func (*SetSyncSegment) GetTargetToBlockTags ¶
func (f *SetSyncSegment) GetTargetToBlockTags() []aggkittypes.BlockNumberFinality
GetTargetToBlockTags returns the list of TargetToBlock tags in the SetSyncSegment witout duplicates
func (*SetSyncSegment) GetTotalPendingBlockRange ¶
func (f *SetSyncSegment) GetTotalPendingBlockRange() *aggkitcommon.BlockRange
func (*SetSyncSegment) IsAvailable ¶
func (f *SetSyncSegment) IsAvailable(query LogQuery) bool
IsAvailable checks if the required LogQuery data is already synced
func (*SetSyncSegment) IsPartiallyAvailable ¶
func (f *SetSyncSegment) IsPartiallyAvailable(query LogQuery) (bool, *LogQuery)
IsPartiallyAvailable checks if some part of the LogQuery is already synced always starting from FromBlock If there are any data avaible, it returns true and the LogQuery with the available data
func (*SetSyncSegment) NextQuery ¶
func (f *SetSyncSegment) NextQuery(syncBlockChunkSize uint32, maxBlockNumber uint64, applyMaxBlockNumber bool) (*LogQuery, error)
NextQuery generates the next LogQuery to sync based on the lowest FromBlock pending to synchronize
func (*SetSyncSegment) Remove ¶
func (f *SetSyncSegment) Remove(segmentToRemove *SyncSegment)
func (*SetSyncSegment) SegmentsByContract ¶
func (s *SetSyncSegment) SegmentsByContract(addrs []common.Address) []SyncSegment
SegmentsByContract returns segments for the given contract addresses
func (*SetSyncSegment) String ¶
func (s *SetSyncSegment) String() string
String returns a string representation of the SetSyncSegment
func (*SetSyncSegment) SubtractLogQuery ¶
func (f *SetSyncSegment) SubtractLogQuery(logQuery *LogQuery) error
SubtractLogQuery removes the block ranges defined in the logQuery from the current SetSyncSegment This is used to update the pendingSync after doing a FilterLogs query
func (*SetSyncSegment) SubtractSegments ¶
func (f *SetSyncSegment) SubtractSegments(segments *SetSyncSegment) error
SubtractSegments removes the block ranges defined in segments from the current SetSyncSegment This is the pending data to synchronize
func (*SetSyncSegment) TotalBlocks ¶
func (f *SetSyncSegment) TotalBlocks() uint64
TotalBlocks returns the total number pending blocks to synchronize
func (*SetSyncSegment) UpdateBlockRange ¶
func (f *SetSyncSegment) UpdateBlockRange(segment *SyncSegment, newBlockRange aggkitcommon.BlockRange)
type SetSyncerConfig ¶
type SetSyncerConfig struct {
// contains filtered or unexported fields
}
func NewSetSyncerConfig ¶
func NewSetSyncerConfig() SetSyncerConfig
func (*SetSyncerConfig) Add ¶
func (f *SetSyncerConfig) Add(filter aggkittypes.SyncerConfig)
func (*SetSyncerConfig) Addresses ¶
func (f *SetSyncerConfig) Addresses(blockRange aggkitcommon.BlockRange) []common.Address
Addresses returns the unique list of contract addresses from all filters within the specified block range TODO: check blockRange
func (*SetSyncerConfig) Brief ¶
func (f *SetSyncerConfig) Brief() string
func (*SetSyncerConfig) ContractConfigs ¶
func (f *SetSyncerConfig) ContractConfigs() []ContractConfig
ContractConfigs combines the SyncerConfig into ContractConfig per contract address
func (*SetSyncerConfig) GetTargetToBlockTags ¶
func (f *SetSyncerConfig) GetTargetToBlockTags() []aggkittypes.BlockNumberFinality
GetTargetToBlockTags returns the list of TargetToBlock tags in the SetSyncSegment witout duplicates
func (*SetSyncerConfig) SyncSegments ¶
func (f *SetSyncerConfig) SyncSegments( blockNumbers map[aggkittypes.BlockNumberFinality]uint64) (*SetSyncSegment, error)
SyncSegments groups the SetSyncerConfig into segments per contract address and blockRange
type Storager ¶
type Storager interface {
StoragerForReorg
dbtypes.KeyValueStorager
// GetSyncedBlockRangePerContract It returns the synced block range stored in DB
GetSyncedBlockRangePerContract(tx dbtypes.Querier) (SetSyncSegment, error)
SaveEthLogsWithHeaders(tx dbtypes.Querier, blockHeaders aggkittypes.ListBlockHeaders,
logs []types.Log, isFinal bool) error
// TODO: Deprecate GetEthLogs and use LogQuery instead
GetEthLogs(tx dbtypes.Querier, query LogQuery) ([]types.Log, error)
LogQuery(tx dbtypes.Querier, query LogQuery) (LogQueryResponse, error)
UpdateSyncedStatus(tx dbtypes.Querier, segments []SyncSegment) error
UpsertSyncerConfigs(tx dbtypes.Querier, configs []ContractConfig) error
GetBlockHeaderByNumber(tx dbtypes.Querier, blockNumber uint64) (*aggkittypes.BlockHeader, FinalizedType, error)
NewTx(ctx context.Context) (dbtypes.Txer, error)
// GetBlockHeadersNotFinalized retrieves all block headers that are not finalized <= maxBlock
// if maxBlock is nil, retrieves all not finalized blocks
GetBlockHeadersNotFinalized(tx dbtypes.Querier, maxBlock *uint64) (aggkittypes.ListBlockHeaders, error)
UpdateBlockToFinalized(tx dbtypes.Querier, blockNumbers []uint64) error
GetRangeBlockHeader(tx dbtypes.Querier, isFinal FinalizedType) (lowest *aggkittypes.BlockHeader,
highest *aggkittypes.BlockHeader, err error)
// GetHighestBlockNumber returns the highest block number stored in db
GetHighestBlockNumber(tx dbtypes.Querier) (uint64, error)
// GetBlockReorgedReorgID returns the reorgID of the reorged block if exists
// second return value indicates if the block is reorged
GetBlockReorgedReorgID(tx dbtypes.Querier,
blockNumber uint64, blockHash common.Hash) (uint64, bool, error)
GetReorgedDataByReorgID(tx dbtypes.Querier,
reorgID uint64) (*ReorgData, error)
}
type StoragerForReorg ¶
type StoragerForReorg interface {
GetBlockHeaderByNumber(tx dbtypes.Querier, blockNumber uint64) (*aggkittypes.BlockHeader, FinalizedType, error)
InsertReorgAndMoveReorgedBlocksAndLogs(tx dbtypes.Querier, reorgData ReorgData) (uint64, error)
}
type SyncSegment ¶
type SyncSegment struct {
ContractAddr common.Address
// BlockRange can be empty BlockRange.IsEmpty()
BlockRange aggkitcommon.BlockRange
TargetToBlock aggkittypes.BlockNumberFinality
}
SyncSegment represents a segment of blocks, it is used for synced segments but also for representing segments to be synced
func NewSyncSegment ¶
func NewSyncSegment(contractAddr common.Address, blockRange aggkitcommon.BlockRange, targetToBlock aggkittypes.BlockNumberFinality, requiredBlockHeader bool) SyncSegment
NewSyncSegment creates a new SyncSegment
func (*SyncSegment) Clone ¶
func (s *SyncSegment) Clone() *SyncSegment
Clone creates a deep copy of the SyncSegment
func (*SyncSegment) Empty ¶
func (s *SyncSegment) Empty()
Empty sets the SyncSegment (fromBlock > toBlock) to indicate it is empty
func (SyncSegment) Equal ¶
func (s SyncSegment) Equal(other SyncSegment) bool
Equal checks if two SyncSegments are equal
func (*SyncSegment) IsEmpty ¶
func (s *SyncSegment) IsEmpty() bool
func (*SyncSegment) IsValid ¶
func (s *SyncSegment) IsValid() bool
There are special values like BlockRange(0,0) that we want to consider invalid for multidownloader, so we need this method to check the validity of the SyncSegment
func (SyncSegment) NewBlockRange ¶
func (s SyncSegment) NewBlockRange(br aggkitcommon.BlockRange) SyncSegment
NewBlockRange creates a new SyncSegment changing only the BlockRange
func (*SyncSegment) String ¶
func (s *SyncSegment) String() string
String returns a string representation of the SyncSegment