collections

package
v0.43.4-access-ingesti... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2025 License: AGPL-3.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultCollectionCatchupTimeout is the timeout for the initial collection catchup process
	// during startup.
	DefaultCollectionCatchupTimeout = 30 * time.Second

	// DefaultCollectionCatchupDBPollInterval is the interval at which the storage is polled to check
	// if missing collections have been received during the initial collection catchup process.
	DefaultCollectionCatchupDBPollInterval = 1 * time.Second

	// DefaultMissingCollsRequestInterval is the interval at which the syncer checks missing collections
	// and re-requests them from the network if needed.
	DefaultMissingCollsRequestInterval = 1 * time.Minute

	// DefaultMissingCollsForBlockThreshold is the threshold number of blocks with missing collections
	// beyond which collections should be re-requested. This prevents spamming the collection nodes
	// with requests for recent data.
	DefaultMissingCollsForBlockThreshold = 100

	// DefaultMissingCollsForAgeThreshold is the block height threshold below which collections
	// should be re-requested, regardless of the number of blocks for which collection are missing.
	// This is to ensure that if a collection is missing for a long time (in terms of block height)
	// it is eventually re-requested.
	DefaultMissingCollsForAgeThreshold = 100
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CollectionIndexer

type CollectionIndexer interface {
	// OnCollectionReceived notifies the collection indexer that a new collection is available to be indexed.
	// Calling this method multiple times with the same collection is a no-op.
	// This method is non-blocking.
	OnCollectionReceived(collection *flow.Collection)

	// MissingCollectionsAtHeight returns all collections that are not present in storage for a specific
	// finalized block height.
	//
	// Expected error returns during normal operation:
	//   - [storage.ErrNotFound]: if provided block height is not finalized or below this node's root block
	MissingCollectionsAtHeight(height uint64) ([]*flow.CollectionGuarantee, error)

	// IsCollectionInStorage checks whether the given collection is present in local storage.
	//
	// No error returns are expected during normal operation.
	IsCollectionInStorage(collectionID flow.Identifier) (bool, error)
}

type ExecutionDataSyncer

type ExecutionDataSyncer struct {
	// contains filtered or unexported fields
}

ExecutionDataSyncer submits collections from execution data to the collections indexer. It is designed to be used within the collection syncer to optimize indexing when collection data is already available on the node.

func NewExecutionDataSyncer

func NewExecutionDataSyncer(
	executionDataCache execution_data.ExecutionDataCache,
	indexer CollectionIndexer,
) *ExecutionDataSyncer

func (*ExecutionDataSyncer) IndexForHeight

func (s *ExecutionDataSyncer) IndexForHeight(ctx context.Context, height uint64) (bool, error)

IndexForHeight indexes the collections for a given height using locally available execution data. Returns false and no error if execution data for the block is not available.

No error returns are expected during normal operation.

func (*ExecutionDataSyncer) IndexFromStartHeight

func (s *ExecutionDataSyncer) IndexFromStartHeight(ctx context.Context, lastFullBlockHeight uint64) (uint64, error)

IndexFromStartHeight indexes the collections for all blocks with available execution data starting from the last full block height. Returns the last indexed height.

No error returns are expected during normal operation.

type Indexer

type Indexer struct {
	// contains filtered or unexported fields
}

Indexer stores and indexes collections received from the network. It is designed to be the central point for accumulating collections from various subsystems that my receive them from the network. For example, collections may be received from execution data sync, the collection syncer, or the execution state indexer. Depending on the node's configuration, one or more of these subsystems will feed the indexer with collections.

The indexer also maintains the last full block height state, which is the highest block height for which all collections are stored and indexed.

func NewIndexer

func NewIndexer(
	log zerolog.Logger,
	metrics module.CollectionExecutedMetric,
	state protocol.State,
	blocks storage.Blocks,
	collections storage.Collections,
	lastFullBlockHeight *counters.PersistentStrictMonotonicCounter,
	lockManager lockctx.Manager,
) (*Indexer, error)

NewIndexer creates a new Indexer.

No error returns are expected during normal operation.

func (*Indexer) IsCollectionInStorage

func (ci *Indexer) IsCollectionInStorage(collectionID flow.Identifier) (bool, error)

IsCollectionInStorage checks whether the given collection is present in local storage.

No error returns are expected during normal operation.

func (*Indexer) MissingCollectionsAtHeight

func (ci *Indexer) MissingCollectionsAtHeight(height uint64) ([]*flow.CollectionGuarantee, error)

MissingCollectionsAtHeight returns all collections that are not present in storage for a specific finalized block height.

Expected error returns during normal operation:

func (*Indexer) OnCollectionReceived

func (ci *Indexer) OnCollectionReceived(collection *flow.Collection)

OnCollectionReceived notifies the collection indexer that a new collection is available to be indexed. Calling this method multiple times with the same collection is a no-op. This method is non-blocking.

func (*Indexer) WorkerLoop

func (ci *Indexer) WorkerLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc)

WorkerLoop is a component.ComponentWorker that continuously processes collections submitted to the indexer and maintains the last full block height state.

There should only be a single instance of this method running at a time.

type Syncer

type Syncer struct {
	// contains filtered or unexported fields
}

The Syncer is responsible for syncing collections for finalized blocks from the network. It has three main responsibilities:

  1. Handle requests for collections for finalized blocks from the ingestion engine by submitting the requests to Collection nodes.
  2. Track blocks with missing collections, and periodically re-request the missing collections from the network.
  3. Submit collections received to the Indexer for storage and indexing.

The Syncer guarantees that all collections for finalized blocks will eventually be received as long as there are honest Collection nodes in each cluster, and the node is able to successfully communicate with them over the networking layer.

It is responsible for ensuring the local node has all collections contained within finalized blocks starting from the last fully synced height. It works by periodically scanning the finalized block range from the last full block height up to the latest finalized block height, identifying missing collections, and triggering requests to fetch them from the network. Once collections are retrieved, it submits them to the Indexer for storage and indexing.

It is meant to operate in a background goroutine as part of the node's ingestion pipeline.

func NewSyncer

func NewSyncer(
	log zerolog.Logger,
	requester module.Requester,
	state protocol.State,
	collections storage.Collections,
	lastFullBlockHeight counters.Reader,
	collectionIndexer CollectionIndexer,
	execDataSyncer *ExecutionDataSyncer,
) *Syncer

NewSyncer creates a new Syncer responsible for requesting, tracking, and indexing missing collections.

func (*Syncer) OnCollectionDownloaded

func (s *Syncer) OnCollectionDownloaded(_ flow.Identifier, entity flow.Entity)

OnCollectionDownloaded notifies the collection syncer that a collection has been downloaded. This callback implements [requester.HandleFunc] and is intended to be used with the requester engine. Panics if the provided entity is not a flow.Collection.

func (*Syncer) RequestCollectionsForBlock

func (s *Syncer) RequestCollectionsForBlock(height uint64, missingCollections []*flow.CollectionGuarantee) error

RequestCollectionsForBlock conditionally requests missing collections for a specific block height, skipping requests if the block is already below the known last full block height.

No error returns are expected during normal operation.

func (*Syncer) WorkerLoop

func (s *Syncer) WorkerLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc)

WorkerLoop is a component.ComponentWorker that continuously monitors for missing collections, and requests them from the network if needed. It also performs an initial collection catchup on startup.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL