Documentation
¶
Index ¶
- Constants
- type CollectionIndexer
- type ExecutionDataSyncer
- type Indexer
- func (ci *Indexer) IndexCollections(collections []*flow.Collection) error
- func (ci *Indexer) MissingCollectionsAtHeight(height uint64) ([]*flow.CollectionGuarantee, error)
- func (ci *Indexer) OnCollectionReceived(collection *flow.Collection)
- func (ci *Indexer) WorkerLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc)
- type Syncer
Constants ¶
const ( // DefaultCollectionCatchupTimeout is the timeout for the initial collection catchup process // during startup. DefaultCollectionCatchupTimeout = 30 * time.Second // DefaultCollectionCatchupDBPollInterval is the interval at which the storage is polled to check // if missing collections have been received during the initial collection catchup process. DefaultCollectionCatchupDBPollInterval = 1 * time.Second // DefaultMissingCollectionRequestInterval is the interval at which the syncer checks missing collections // and re-requests them from the network if needed. DefaultMissingCollectionRequestInterval = 1 * time.Minute // DefaultMissingCollectionRequestThreshold is the block height threshold below which collections // should be re-requested, regardless of the number of blocks for which collection are missing. // This is to ensure that if a collection is missing for a long time (in terms of block height) // it is eventually re-requested. DefaultMissingCollectionRequestThreshold = 100 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CollectionIndexer ¶
type CollectionIndexer interface {
// OnCollectionReceived notifies the collection indexer that a new collection is available to be indexed.
// Calling this method multiple times with the same collection is a no-op.
// This method is non-blocking.
OnCollectionReceived(collection *flow.Collection)
// IndexCollections indexes a set of collections, skipping any collections which already exist in storage.
// Calling this method multiple times with the same collections is a no-op.
//
// No error returns are expected during normal operation.
IndexCollections(collections []*flow.Collection) error
// MissingCollectionsAtHeight returns all collections that are not present in storage for a specific
// finalized block height.
//
// Expected error returns during normal operation:
// - [storage.ErrNotFound]: if provided block height is not finalized or below this node's root block
MissingCollectionsAtHeight(height uint64) ([]*flow.CollectionGuarantee, error)
}
type ExecutionDataSyncer ¶
type ExecutionDataSyncer struct {
// contains filtered or unexported fields
}
ExecutionDataSyncer submits collections from execution data to the collections indexer. It is designed to be used within the collection syncer to optimize indexing when collection data is already available on the node.
func NewExecutionDataSyncer ¶
func NewExecutionDataSyncer( log zerolog.Logger, executionDataCache execution_data.ExecutionDataCache, indexer CollectionIndexer, ) *ExecutionDataSyncer
func (*ExecutionDataSyncer) IndexForHeight ¶
IndexForHeight indexes the collections for a given finalized block height using locally available execution data. Returns false and no error if execution data for the block is not available.
Expected error returns during normal operation:
- context.Canceled: if the context is canceled before the collections are indexed.
type Indexer ¶
type Indexer struct {
// contains filtered or unexported fields
}
Indexer stores and indexes collections received from the network. It is designed to be the central point for accumulating collections from various subsystems that my receive them from the network. For example, collections may be received from execution data sync, the collection syncer, or the execution state indexer. Depending on the node's configuration, one or more of these subsystems will feed the indexer with collections.
The indexer also maintains the last full block height state, which is the highest block height for which all collections are stored and indexed.
func NewIndexer ¶
func NewIndexer( log zerolog.Logger, db storage.DB, metrics module.CollectionExecutedMetric, state protocol.State, blocks storage.Blocks, collections storage.Collections, lastFullBlockHeight *counters.PersistentStrictMonotonicCounter, lockManager lockctx.Manager, ) (*Indexer, error)
NewIndexer creates a new Indexer.
No error returns are expected during normal operation.
func (*Indexer) IndexCollections ¶
func (ci *Indexer) IndexCollections(collections []*flow.Collection) error
IndexCollections indexes a set of collections, skipping any collections which already exist in storage. Calling this method multiple times with the same collections is a no-op.
No error returns are expected during normal operation.
func (*Indexer) MissingCollectionsAtHeight ¶
func (ci *Indexer) MissingCollectionsAtHeight(height uint64) ([]*flow.CollectionGuarantee, error)
MissingCollectionsAtHeight returns all collections that are not present in storage for a specific finalized block height.
Expected error returns during normal operation:
- storage.ErrNotFound: if provided block height is not finalized
func (*Indexer) OnCollectionReceived ¶
func (ci *Indexer) OnCollectionReceived(collection *flow.Collection)
OnCollectionReceived notifies the collection indexer that a new collection is available to be indexed. Calling this method multiple times with the same collection is a no-op. This method is non-blocking.
func (*Indexer) WorkerLoop ¶
func (ci *Indexer) WorkerLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc)
WorkerLoop is a component.ComponentWorker that continuously processes collections submitted to the indexer and maintains the last full block height state.
There should only be a single instance of this method running at a time.
type Syncer ¶
type Syncer struct {
// contains filtered or unexported fields
}
The Syncer is responsible for syncing collections for finalized blocks from the network. It has three main responsibilities:
- Handle requests for collections for finalized blocks from the ingestion engine by submitting the requests to Collection nodes.
- Track blocks with missing collections, and periodically re-request the missing collections from the network.
- Submit collections received to the Indexer for storage and indexing.
The Syncer guarantees that all collections for finalized blocks will eventually be received as long as there are honest Collection nodes in each cluster, and the node is able to successfully communicate with them over the networking layer.
It is responsible for ensuring the local node has all collections contained within finalized blocks starting from the last fully synced height. It works by periodically scanning the finalized block range from the last full block height up to the latest finalized block height, identifying missing collections, and triggering requests to fetch them from the network. Once collections are retrieved, it submits them to the Indexer for storage and indexing.
It is meant to operate in a background goroutine as part of the node's ingestion pipeline.
func NewSyncer ¶
func NewSyncer( log zerolog.Logger, requester module.Requester, state protocol.State, collections storage.Collections, lastFullBlockHeight counters.Reader, collectionIndexer CollectionIndexer, execDataSyncer *ExecutionDataSyncer, ) *Syncer
NewSyncer creates a new Syncer responsible for requesting, tracking, and indexing missing collections.
func (*Syncer) OnCollectionDownloaded ¶
func (s *Syncer) OnCollectionDownloaded(_ flow.Identifier, entity flow.Entity)
OnCollectionDownloaded notifies the collection syncer that a collection has been downloaded. This callback implements [requester.HandleFunc] and is intended to be used with the requester engine. Panics if the provided entity is not a flow.Collection.
func (*Syncer) RequestCollectionsForBlock ¶
func (s *Syncer) RequestCollectionsForBlock(height uint64, missingCollections []*flow.CollectionGuarantee) error
RequestCollectionsForBlock conditionally requests missing collections for a specific block height, skipping requests if the block is already below the known last full block height.
No error returns are expected during normal operation.
func (*Syncer) WorkerLoop ¶
func (s *Syncer) WorkerLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc)
WorkerLoop is a component.ComponentWorker that continuously monitors for missing collections, and requests them from the network if needed. It also performs an initial collection catchup on startup.