Documentation
¶
Index ¶
- type BlockProcessor
- func (bp *BlockProcessor) FetchCollections(ctx irrecoverable.SignalerContext, block *flow.Block, done func()) error
- func (bp *BlockProcessor) MissingCollectionQueueSize() uint
- func (bp *BlockProcessor) OnReceiveCollection(originID flow.Identifier, collection *flow.Collection) error
- func (bp *BlockProcessor) PruneUpToHeight(height uint64)
- type CollectionRequester
- type Fetcher
- type MissingCollectionQueue
- func (mcq *MissingCollectionQueue) EnqueueMissingCollections(blockHeight uint64, collectionIDs []flow.Identifier, callback func()) error
- func (mcq *MissingCollectionQueue) IsHeightQueued(height uint64) bool
- func (mcq *MissingCollectionQueue) OnIndexedForBlock(blockHeight uint64) (func(), bool)
- func (mcq *MissingCollectionQueue) OnReceivedCollection(collection *flow.Collection) ([]*flow.Collection, uint64, bool)
- func (mcq *MissingCollectionQueue) PruneUpToHeight(height uint64) []func()
- func (mcq *MissingCollectionQueue) Size() uint
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BlockProcessor ¶
type BlockProcessor struct {
// contains filtered or unexported fields
}
BlockProcessor implements the job lifecycle for collection indexing. It orchestrates the flow: request → receive → index → complete.
func NewBlockProcessor ¶
func NewBlockProcessor( log zerolog.Logger, mcq collection_sync.MissingCollectionQueue, indexer collection_sync.BlockCollectionIndexer, requester collection_sync.CollectionRequester, ) *BlockProcessor
NewBlockProcessor creates a new BlockProcessor.
Parameters:
- log: Logger for the component
- mcq: MissingCollectionQueue for tracking missing collections and callbacks
- indexer: BlockCollectionIndexer for storing and indexing collections
- requester: CollectionRequester for requesting collections from the network
No error returns are expected during normal operation.
func (*BlockProcessor) FetchCollections ¶
func (bp *BlockProcessor) FetchCollections( ctx irrecoverable.SignalerContext, block *flow.Block, done func(), ) error
FetchCollections processes a block for collection fetching. It checks if the block is already indexed, and if not, enqueues missing collections and optionally requests them based on EDI lag.
No error returns are expected during normal operation.
func (*BlockProcessor) MissingCollectionQueueSize ¶
func (bp *BlockProcessor) MissingCollectionQueueSize() uint
MissingCollectionQueueSize returns the number of missing collections currently in the queue.
func (*BlockProcessor) OnReceiveCollection ¶
func (bp *BlockProcessor) OnReceiveCollection(originID flow.Identifier, collection *flow.Collection) error
OnReceiveCollection is called when a collection is received from the requester. It passes the collection to MCQ, and if it completes a block, indexes it and marks it as done.
No error returns are expected during normal operation.
func (*BlockProcessor) PruneUpToHeight ¶
func (bp *BlockProcessor) PruneUpToHeight(height uint64)
PruneUpToHeight removes all tracked heights up to and including the given height.
type CollectionRequester ¶
type CollectionRequester struct {
// contains filtered or unexported fields
}
CollectionRequester requests collections from collection nodes on the network. It implements the collection_sync.CollectionRequester interface.
func NewCollectionRequester ¶
func NewCollectionRequester( requester module.Requester, state protocol.State, ) *CollectionRequester
NewCollectionRequester creates a new CollectionRequester.
Parameters:
- requester: The requester engine for requesting entities from the network
- state: Protocol state for finding guarantors
No error returns are expected during normal operation.
func (*CollectionRequester) RequestCollectionsByGuarantees ¶
func (cr *CollectionRequester) RequestCollectionsByGuarantees(guarantees []*flow.CollectionGuarantee) error
RequestCollectionsByGuarantees requests collections by their guarantees from collection nodes on the network. For each guarantee, it finds the guarantors and requests the collection from them.
No error returns are expected during normal operation.
type Fetcher ¶
Fetcher is a component that consumes finalized block jobs and processes them to index collections. It uses a job consumer with windowed throttling to prevent node overload.
func NewFetcher ¶
func NewFetcher( log zerolog.Logger, blockProcessor collection_sync.BlockProcessor, progressInitializer storage.ConsumerProgressInitializer, state protocol.State, blocks storage.Blocks, maxProcessing uint64, maxSearchAhead uint64, metrics module.CollectionSyncMetrics, ) (*Fetcher, error)
NewFetcher creates a new Fetcher component.
Parameters:
- log: Logger for the component
- blockProcessor: BlockProcessor implementation for processing collection indexing jobs
- progressInitializer: Initializer for tracking processed block heights
- state: Protocol state for reading finalized block information
- blocks: Blocks storage for reading blocks by height
- maxProcessing: Maximum number of jobs to process concurrently
- maxSearchAhead: Maximum number of jobs beyond processedIndex to process. 0 means no limit
- metrics: Optional metrics collector for reporting collection fetched height
No error returns are expected during normal operation.
func (*Fetcher) OnFinalizedBlock ¶
func (s *Fetcher) OnFinalizedBlock()
OnFinalizedBlock is called when a new block is finalized. It notifies the job consumer that new work is available.
func (*Fetcher) ProcessedHeight ¶
LastProcessedIndex returns the last processed job index.
type MissingCollectionQueue ¶
type MissingCollectionQueue struct {
// contains filtered or unexported fields
}
MissingCollectionQueue helps the job processor to keep track of the jobs and their callbacks. Note, it DOES NOT index collections directly, instead, it only keeps track of which collections are missing for each block height, and when all collections for a block height have been received, it returns the collections to the caller for processing (storing and indexing). And let the caller to notify the completion of the processing, so that it can mark the job as done by calling the callback. This allows the MissingCollectionQueue to be decoupled from the actual processing of the collections, keep all states in memory and allow the different callers to hold the lock less time and reduce contention.
The caller is responsible for checking if collections are already in storage before enqueueing them. Only collections that are actually missing should be passed to EnqueueMissingCollections.
MissingCollectionQueue is safe for concurrent use.
func NewMissingCollectionQueue ¶
func NewMissingCollectionQueue() *MissingCollectionQueue
NewMissingCollectionQueue creates a new MissingCollectionQueue.
No error returns are expected during normal operation.
func (*MissingCollectionQueue) EnqueueMissingCollections ¶
func (mcq *MissingCollectionQueue) EnqueueMissingCollections( blockHeight uint64, collectionIDs []flow.Identifier, callback func(), ) error
EnqueueMissingCollections registers missing collections for a block height along with a callback that will be invoked when all collections for that height have been received and indexed.
The caller is responsible for checking if collections are already in storage before calling this method. Only collections that are actually missing should be passed in collectionIDs.
Returns an error if the block height is already enqueued to prevent overwriting existing jobs.
func (*MissingCollectionQueue) IsHeightQueued ¶
func (mcq *MissingCollectionQueue) IsHeightQueued(height uint64) bool
IsHeightQueued returns true if the given height has queued collections Returns false if the height is not tracked
func (*MissingCollectionQueue) OnIndexedForBlock ¶
func (mcq *MissingCollectionQueue) OnIndexedForBlock(blockHeight uint64) (func(), bool)
OnIndexedForBlock notifies the queue that a block height has been indexed, removes that block height from tracking, and return the callback for caller to invoke.
Returns: (callback, true) if the height existed and was processed; (nil, false) if the height was not tracked.
Note, caller should invoke the returned callback if not nil.
Behavior: OnIndexedForBlock can return the callback even before the block is complete (i.e., before all collections have been received). This allows the caller to index a block with partial collections if needed. After indexing:
- The block is removed from tracking (IsHeightQueued returns false)
- All collection-to-height mappings for this block are cleaned up
- Any remaining missing collections are removed from tracking
- Subsequent OnReceivedCollection calls for collections belonging to this block will return (nil, 0, false) because the block has been removed
func (*MissingCollectionQueue) OnReceivedCollection ¶
func (mcq *MissingCollectionQueue) OnReceivedCollection( collection *flow.Collection, ) ([]*flow.Collection, uint64, bool)
OnReceivedCollection notifies the queue that a collection has been received. It checks if the block height is now complete and returns the collections and height.
The collection parameter should be the actual collection object received from the requester.
Returns:
- (collections, height, true) if the block height became complete
- (nil, 0, false) if no block height became complete
func (*MissingCollectionQueue) PruneUpToHeight ¶
func (mcq *MissingCollectionQueue) PruneUpToHeight(height uint64) []func()
PruneUpToHeight removes all block jobs and their collection mappings for block heights less than or equal to the specified height.
func (*MissingCollectionQueue) Size ¶
func (mcq *MissingCollectionQueue) Size() uint
Size returns the number of missing collections currently in the queue. This is the total number of collections across all block heights that are still missing.