types

package
v0.10.0-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2026 License: Apache-2.0, MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrFinished = fmt.Errorf("no more segments to sync")
)

Functions

func IsDetectedReorgError

func IsDetectedReorgError(err error) bool

IsDetectedReorgError checks if an error is a DetectedReorgError

func IsReorgedError

func IsReorgedError(err error) bool

IsReorgedError checks if an error is a ReorgedError

Types

type BlockWithLogs

type BlockWithLogs struct {
	Header  aggkittypes.BlockHeader
	IsFinal bool
	Logs    []Log
}

type CompareBlockHeaders

type CompareBlockHeaders struct {
	BlockNumber   uint64
	StorageHeader *aggkittypes.BlockHeader
	IsFinalized   FinalizedType
	RpcHeader     *aggkittypes.BlockHeader
}

func (*CompareBlockHeaders) ExistsRPCBlock

func (c *CompareBlockHeaders) ExistsRPCBlock() bool

func (*CompareBlockHeaders) ExistsStorageBlock

func (c *CompareBlockHeaders) ExistsStorageBlock() bool

type ContractConfig

type ContractConfig struct {
	Address             common.Address
	FromBlock           uint64
	ToBlock             aggkittypes.BlockNumberFinality
	RequiredBlockHeader bool
	Syncers             []SyncerID
}

ContractConfig represents the configuration for a specific contract to be synced, the same as SyncerConfig but for individual contracts

func NewContractConfigFromSyncerConfig

func NewContractConfigFromSyncerConfig(address common.Address,
	syncerConfig aggkittypes.SyncerConfig) *ContractConfig

func (*ContractConfig) Update

func (c *ContractConfig) Update(syncerConfig aggkittypes.SyncerConfig)

type DetectedReorgError

type DetectedReorgError struct {
	OffendingBlockNumber uint64 // Important: is not the first reorged block, but one of them
	OldHash              common.Hash
	NewHash              common.Hash
	ReorgDetectionReason ReorgDetectionReason
	Message              string
}

DetectedReorgError is an error that is raised when a reorg is detected The block is one of the blocks that were reorged, but not necessarily the first one

func CastDetectedReorgError

func CastDetectedReorgError(err error) *DetectedReorgError

func NewDetectedReorgError

func NewDetectedReorgError(offendingBlockNumber uint64,
	reason ReorgDetectionReason,
	oldHash, newHash common.Hash, msg string) *DetectedReorgError

NewDetectedReorgError creates a new DetectedReorgError

func (*DetectedReorgError) Error

func (e *DetectedReorgError) Error() string

type FinalizedType

type FinalizedType = bool
const (
	NotFinalized FinalizedType = false
	Finalized    FinalizedType = true
)

type Log

type Log struct {
	// Consensus fields:
	// address of the contract that generated the event
	Address common.Address `json:"address" gencodec:"required"`
	// list of topics provided by the contract.
	Topics []common.Hash `json:"topics" gencodec:"required"`
	// supplied by the contract, usually ABI-encoded
	Data []byte `json:"data" gencodec:"required"`

	// Derived fields. These fields are filled in by the node
	// but not secured by consensus.
	// block in which the transaction was included
	BlockNumber uint64 `json:"blockNumber" rlp:"-"`
	// hash of the transaction
	TxHash common.Hash `json:"transactionHash" gencodec:"required" rlp:"-"`
	// index of the transaction in the block
	TxIndex uint `json:"transactionIndex" rlp:"-"`
	// timestamp of the block in which the transaction was included
	BlockTimestamp uint64 `json:"blockTimestamp" rlp:"-"`
	// index of the log in the block
	Index uint `json:"logIndex" rlp:"-"`

	// The Removed field is true if this log was reverted due to a chain reorganisation.
	// You must pay attention to this field if you receive logs through a filter query.
	Removed bool `json:"removed" rlp:"-"`
}

type LogQuery

type LogQuery struct {
	Addrs      []common.Address
	BlockRange aggkitcommon.BlockRange
	// If BlockHash is set BlockRange contains the corresponding blockNumber
	BlockHash *common.Hash
}

LogQuery defines a query for logs

func NewLogQuery

func NewLogQuery(fromBlock uint64, toBlock uint64, addrs []common.Address) LogQuery

NewLogQuery creates a new LogQuery

func NewLogQueryBlockHash

func NewLogQueryBlockHash(blockNumber uint64, blockHash common.Hash, addrs []common.Address) LogQuery

func NewLogQueryFromEthereumFilter

func NewLogQueryFromEthereumFilter(query ethereum.FilterQuery) LogQuery

NewLogQueryFromEthereumFilter creates a new LogQuery from an Ethereum FilterQuery

func (*LogQuery) IsBlockHashQuery

func (l *LogQuery) IsBlockHashQuery() bool

func (*LogQuery) IsBlockRangeQuery

func (l *LogQuery) IsBlockRangeQuery() bool

func (*LogQuery) IsEmpty

func (l *LogQuery) IsEmpty() bool

func (*LogQuery) IsValid

func (l *LogQuery) IsValid() bool

func (*LogQuery) String

func (l *LogQuery) String() string

String returns a string representation of the LogQuery

func (*LogQuery) ToRPCFilterQuery

func (l *LogQuery) ToRPCFilterQuery() ethereum.FilterQuery

ToRPCFilterQuery converts the LogQuery to an Ethereum FilterQuery

type LogQueryResponse

type LogQueryResponse struct {
	Blocks []BlockWithLogs
	// ResponseRange indicates the block range covered by the response, even if blocks are empty
	ResponseRange aggkitcommon.BlockRange
	// UnsafeRange indicates the block range that are in unsafe zone (not finalized)
	UnsafeRange aggkitcommon.BlockRange
}

func (*LogQueryResponse) CountLogs

func (lqr *LogQueryResponse) CountLogs() int

type ReorgData

type ReorgData struct {
	// ReorgID is the unique identifier for the reorg stored in DB (incremental ID)
	ReorgID uint64
	// BlockRangeAffected is the range of blocks affected by the reorg (from,to inclusive)
	BlockRangeAffected aggkitcommon.BlockRange
	// DetectedAtBlock is the block number where the reorg was detected
	DetectedAtBlock           uint64
	DetectedTimestamp         uint64
	NetworkLatestBlock        uint64
	NetworkFinalizedBlock     uint64
	NetworkFinalizedBlockName aggkittypes.BlockNumberFinality
	Description               string
}

func (*ReorgData) String

func (r *ReorgData) String() string

type ReorgDetectionReason

type ReorgDetectionReason int
const (
	ReorgDetectionReason_BlockHashMismatch ReorgDetectionReason = iota + 1
	ReorgDetectionReason_ParentHashMismatch
	ReorgDetectionReason_MissingBlock
	// Forced act as MissingBlock but without checking it is basically a debug mode
	// to produce reorgs scenario (it must have enabled the develMode)
	ReorgDetectionReason_Forced
)

func (ReorgDetectionReason) String

func (r ReorgDetectionReason) String() string

type ReorgPorter

type ReorgPorter interface {
	NewTx(ctx context.Context) (dbtypes.Txer, error)
	GetBlockStorageAndRPC(ctx context.Context, tx dbtypes.Querier, blockNumber uint64) (*CompareBlockHeaders, error)
	GetLastBlockNumberInStorage(tx dbtypes.Querier) (uint64, error)
	// Return ChainID of the inserted reorg
	MoveReorgedBlocks(tx dbtypes.Querier, reorgData ReorgData) (uint64, error)
	GetBlockNumberInRPC(ctx context.Context, blockFinality aggkittypes.BlockNumberFinality) (uint64, error)
	TimeNowUnix() uint64
}

type ReorgProcessor

type ReorgProcessor interface {
	// ProcessReorg processes a detected reorg starting from the offending block number.
	// It identifies the range of blocks affected by the reorg and takes necessary actions
	// to handle the reorganization.
	// input parameters:
	// - ctx: the context for managing cancellation and timeouts
	// - detectedReorgError: the error returned by the reorg detection logic, containing
	//   the offending block number and the reason for the reorg detection
	// - finalizedBlockTag: the block tag to consider as finalized (typically finalizedBlock)
	ProcessReorg(ctx context.Context, detectedReorgError DetectedReorgError,
		finalizedBlockTag aggkittypes.BlockNumberFinality) error
}

type ReorgedError

type ReorgedError struct {
	Message           string
	BlockRangeReorged aggkitcommon.BlockRange
	ReorgID           uint64
}

func CastReorgedError

func CastReorgedError(err error) *ReorgedError

func NewReorgedError

func NewReorgedError(blockRangeReorged aggkitcommon.BlockRange,
	reorgID uint64,
	msg string) *ReorgedError

func (*ReorgedError) Error

func (e *ReorgedError) Error() string

type SetSyncSegment

type SetSyncSegment struct {
	// contains filtered or unexported fields
}

func NewSetSyncSegment

func NewSetSyncSegment() SetSyncSegment

NewSetSyncSegment creates a new empty SetSyncSegment

func NewSetSyncSegmentFromLogQuery

func NewSetSyncSegmentFromLogQuery(logQuery *LogQuery) (SetSyncSegment, error)

NewSetSyncSegmentFromLogQuery creates a new SetSyncSegment from a LogQuery

func (*SetSyncSegment) Add

func (s *SetSyncSegment) Add(segment SyncSegment)

Add adds a new SyncSegment to the SetSyncSegment, merging block ranges if the contract address already exists

func (*SetSyncSegment) AddLogQuery

func (f *SetSyncSegment) AddLogQuery(logQuery *LogQuery) error

AddLogQuery adds all segments from the LogQuery to the SetSyncSegment used to update the syncedSegments after a successful FilterLogs

func (*SetSyncSegment) Clone

func (f *SetSyncSegment) Clone() *SetSyncSegment

func (*SetSyncSegment) Empty

func (f *SetSyncSegment) Empty(segment *SyncSegment)

func (*SetSyncSegment) Finished

func (f *SetSyncSegment) Finished() bool

func (*SetSyncSegment) GetAddressesForBlock

func (f *SetSyncSegment) GetAddressesForBlock(blockNumber uint64) []common.Address

func (*SetSyncSegment) GetAddressesForBlockRange

func (f *SetSyncSegment) GetAddressesForBlockRange(blockRange aggkitcommon.BlockRange) []common.Address

func (*SetSyncSegment) GetByContract

func (s *SetSyncSegment) GetByContract(addr common.Address) (SyncSegment, bool)

GetByContract returns the SyncSegment for the given contract address it returns true if it exists, otherwise it returns false

func (*SetSyncSegment) GetContracts

func (s *SetSyncSegment) GetContracts() []common.Address

GetContracts returns the list of contract addresses in the SetSyncSegment

func (*SetSyncSegment) GetHighestBlockNumber

func (f *SetSyncSegment) GetHighestBlockNumber() (uint64, aggkittypes.BlockNumberFinality)

func (*SetSyncSegment) GetLowestFromBlockSegment

func (f *SetSyncSegment) GetLowestFromBlockSegment() *SyncSegment

func (*SetSyncSegment) GetSegments

func (s *SetSyncSegment) GetSegments() []SyncSegment

func (*SetSyncSegment) GetTargetToBlockTags

func (f *SetSyncSegment) GetTargetToBlockTags() []aggkittypes.BlockNumberFinality

GetTargetToBlockTags returns the list of TargetToBlock tags in the SetSyncSegment witout duplicates

func (*SetSyncSegment) GetTotalPendingBlockRange

func (f *SetSyncSegment) GetTotalPendingBlockRange() *aggkitcommon.BlockRange

func (*SetSyncSegment) IsAvailable

func (f *SetSyncSegment) IsAvailable(query LogQuery) bool

IsAvailable checks if the required LogQuery data is already synced

func (*SetSyncSegment) IsPartiallyAvailable

func (f *SetSyncSegment) IsPartiallyAvailable(query LogQuery) (bool, *LogQuery)

IsPartiallyAvailable checks if some part of the LogQuery is already synced always starting from FromBlock If there are any data avaible, it returns true and the LogQuery with the available data

func (*SetSyncSegment) NextQuery

func (f *SetSyncSegment) NextQuery(syncBlockChunkSize uint32,
	maxBlockNumber uint64, applyMaxBlockNumber bool) (*LogQuery, error)

NextQuery generates the next LogQuery to sync based on the lowest FromBlock pending to synchronize

func (*SetSyncSegment) Remove

func (f *SetSyncSegment) Remove(segmentToRemove *SyncSegment)

func (*SetSyncSegment) SegmentsByContract

func (s *SetSyncSegment) SegmentsByContract(addrs []common.Address) []SyncSegment

SegmentsByContract returns segments for the given contract addresses

func (*SetSyncSegment) String

func (s *SetSyncSegment) String() string

String returns a string representation of the SetSyncSegment

func (*SetSyncSegment) SubtractLogQuery

func (f *SetSyncSegment) SubtractLogQuery(logQuery *LogQuery) error

SubtractLogQuery removes the block ranges defined in the logQuery from the current SetSyncSegment This is used to update the pendingSync after doing a FilterLogs query

func (*SetSyncSegment) SubtractSegments

func (f *SetSyncSegment) SubtractSegments(segments *SetSyncSegment) error

SubtractSegments removes the block ranges defined in segments from the current SetSyncSegment This is the pending data to synchronize

func (*SetSyncSegment) TotalBlocks

func (f *SetSyncSegment) TotalBlocks() uint64

TotalBlocks returns the total number pending blocks to synchronize

func (*SetSyncSegment) UpdateBlockRange

func (f *SetSyncSegment) UpdateBlockRange(segment *SyncSegment, newBlockRange aggkitcommon.BlockRange)

type SetSyncerConfig

type SetSyncerConfig struct {
	// contains filtered or unexported fields
}

func NewSetSyncerConfig

func NewSetSyncerConfig() SetSyncerConfig

func (*SetSyncerConfig) Add

func (f *SetSyncerConfig) Add(filter aggkittypes.SyncerConfig)

func (*SetSyncerConfig) Addresses

func (f *SetSyncerConfig) Addresses(blockRange aggkitcommon.BlockRange) []common.Address

Addresses returns the unique list of contract addresses from all filters within the specified block range TODO: check blockRange

func (*SetSyncerConfig) Brief

func (f *SetSyncerConfig) Brief() string

func (*SetSyncerConfig) ContractConfigs

func (f *SetSyncerConfig) ContractConfigs() []ContractConfig

ContractConfigs combines the SyncerConfig into ContractConfig per contract address

func (*SetSyncerConfig) GetTargetToBlockTags

func (f *SetSyncerConfig) GetTargetToBlockTags() []aggkittypes.BlockNumberFinality

GetTargetToBlockTags returns the list of TargetToBlock tags in the SetSyncSegment witout duplicates

func (*SetSyncerConfig) SyncSegments

func (f *SetSyncerConfig) SyncSegments(
	blockNumbers map[aggkittypes.BlockNumberFinality]uint64) (*SetSyncSegment, error)

SyncSegments groups the SetSyncerConfig into segments per contract address and blockRange

type Storager

type Storager interface {
	StoragerForReorg
	dbtypes.KeyValueStorager
	// GetSyncedBlockRangePerContract It returns the synced block range stored in DB
	GetSyncedBlockRangePerContract(tx dbtypes.Querier) (SetSyncSegment, error)
	SaveEthLogsWithHeaders(tx dbtypes.Querier, blockHeaders aggkittypes.ListBlockHeaders,
		logs []types.Log, isFinal bool) error
	// TODO: Deprecate GetEthLogs and use LogQuery instead
	GetEthLogs(tx dbtypes.Querier, query LogQuery) ([]types.Log, error)
	LogQuery(tx dbtypes.Querier, query LogQuery) (LogQueryResponse, error)
	UpdateSyncedStatus(tx dbtypes.Querier, segments []SyncSegment) error
	UpsertSyncerConfigs(tx dbtypes.Querier, configs []ContractConfig) error
	GetBlockHeaderByNumber(tx dbtypes.Querier, blockNumber uint64) (*aggkittypes.BlockHeader, FinalizedType, error)
	NewTx(ctx context.Context) (dbtypes.Txer, error)
	// GetBlockHeadersNotFinalized retrieves all block headers that are not finalized <= maxBlock
	// if maxBlock is nil, retrieves all not finalized blocks
	GetBlockHeadersNotFinalized(tx dbtypes.Querier, maxBlock *uint64) (aggkittypes.ListBlockHeaders, error)
	UpdateBlockToFinalized(tx dbtypes.Querier, blockNumbers []uint64) error
	GetRangeBlockHeader(tx dbtypes.Querier, isFinal FinalizedType) (lowest *aggkittypes.BlockHeader,
		highest *aggkittypes.BlockHeader, err error)
	// GetHighestBlockNumber returns the highest block number stored in db
	GetHighestBlockNumber(tx dbtypes.Querier) (uint64, error)
	// GetBlockReorgedReorgID returns the reorgID of the reorged block if exists
	// second return value indicates if the block is reorged
	GetBlockReorgedReorgID(tx dbtypes.Querier,
		blockNumber uint64, blockHash common.Hash) (uint64, bool, error)
	GetReorgedDataByReorgID(tx dbtypes.Querier,
		reorgID uint64) (*ReorgData, error)
}

type StoragerForReorg

type StoragerForReorg interface {
	GetBlockHeaderByNumber(tx dbtypes.Querier, blockNumber uint64) (*aggkittypes.BlockHeader, FinalizedType, error)
	InsertReorgAndMoveReorgedBlocksAndLogs(tx dbtypes.Querier, reorgData ReorgData) (uint64, error)
}

type SyncSegment

type SyncSegment struct {
	ContractAddr common.Address
	// BlockRange can be empty  BlockRange.IsEmpty()
	BlockRange    aggkitcommon.BlockRange
	TargetToBlock aggkittypes.BlockNumberFinality
}

SyncSegment represents a segment of blocks, it is used for synced segments but also for representing segments to be synced

func NewSyncSegment

func NewSyncSegment(contractAddr common.Address,
	blockRange aggkitcommon.BlockRange,
	targetToBlock aggkittypes.BlockNumberFinality,
	requiredBlockHeader bool) SyncSegment

NewSyncSegment creates a new SyncSegment

func (*SyncSegment) Clone

func (s *SyncSegment) Clone() *SyncSegment

Clone creates a deep copy of the SyncSegment

func (*SyncSegment) Empty

func (s *SyncSegment) Empty()

Empty sets the SyncSegment (fromBlock > toBlock) to indicate it is empty

func (SyncSegment) Equal

func (s SyncSegment) Equal(other SyncSegment) bool

Equal checks if two SyncSegments are equal

func (*SyncSegment) IsEmpty

func (s *SyncSegment) IsEmpty() bool

func (*SyncSegment) IsValid

func (s *SyncSegment) IsValid() bool

There are special values like BlockRange(0,0) that we want to consider invalid for multidownloader, so we need this method to check the validity of the SyncSegment

func (SyncSegment) NewBlockRange

func (s SyncSegment) NewBlockRange(br aggkitcommon.BlockRange) SyncSegment

NewBlockRange creates a new SyncSegment changing only the BlockRange

func (*SyncSegment) String

func (s *SyncSegment) String() string

String returns a string representation of the SyncSegment

type SyncerID

type SyncerID = string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL