fetcher

package
v0.44.0-unsafe-collect... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 24, 2025 License: AGPL-3.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BlockProcessor

type BlockProcessor struct {
	// contains filtered or unexported fields
}

BlockProcessor implements the job lifecycle for collection indexing. It orchestrates the flow: request → receive → index → complete.

func NewBlockProcessor

NewBlockProcessor creates a new BlockProcessor.

Parameters:

  • log: Logger for the component
  • mcq: MissingCollectionQueue for tracking missing collections and callbacks
  • indexer: BlockCollectionIndexer for storing and indexing collections
  • requester: CollectionRequester for requesting collections from the network

No error returns are expected during normal operation.

func (*BlockProcessor) FetchCollections

func (bp *BlockProcessor) FetchCollections(
	ctx irrecoverable.SignalerContext,
	block *flow.Block,
	done func(),
) error

FetchCollections processes a block for collection fetching. It checks if the block is already indexed, and if not, enqueues missing collections and optionally requests them based on EDI lag.

No error returns are expected during normal operation.

func (*BlockProcessor) MissingCollectionQueueSize

func (bp *BlockProcessor) MissingCollectionQueueSize() uint

MissingCollectionQueueSize returns the number of missing collections currently in the queue.

func (*BlockProcessor) OnReceiveCollection

func (bp *BlockProcessor) OnReceiveCollection(originID flow.Identifier, collection *flow.Collection) error

OnReceiveCollection is called when a collection is received from the requester. It passes the collection to MCQ, and if it completes a block, indexes it and marks it as done.

No error returns are expected during normal operation.

func (*BlockProcessor) PruneUpToHeight

func (bp *BlockProcessor) PruneUpToHeight(height uint64)

PruneUpToHeight removes all tracked heights up to and including the given height.

type CollectionRequester

type CollectionRequester struct {
	// contains filtered or unexported fields
}

CollectionRequester requests collections from collection nodes on the network. It implements the collection_sync.CollectionRequester interface.

func NewCollectionRequester

func NewCollectionRequester(
	requester module.Requester,
	state protocol.State,
) *CollectionRequester

NewCollectionRequester creates a new CollectionRequester.

Parameters:

  • requester: The requester engine for requesting entities from the network
  • state: Protocol state for finding guarantors

No error returns are expected during normal operation.

func (*CollectionRequester) RequestCollectionsByGuarantees

func (cr *CollectionRequester) RequestCollectionsByGuarantees(guarantees []*flow.CollectionGuarantee) error

RequestCollectionsByGuarantees requests collections by their guarantees from collection nodes on the network. For each guarantee, it finds the guarantors and requests the collection from them.

No error returns are expected during normal operation.

type Fetcher

type Fetcher struct {
	component.Component
	// contains filtered or unexported fields
}

Fetcher is a component that consumes finalized block jobs and processes them to index collections. It uses a job consumer with windowed throttling to prevent node overload.

func NewFetcher

func NewFetcher(
	log zerolog.Logger,
	blockProcessor collection_sync.BlockProcessor, progressInitializer storage.ConsumerProgressInitializer,
	state protocol.State,
	blocks storage.Blocks,
	maxProcessing uint64,
	maxSearchAhead uint64,
	metrics module.CollectionSyncMetrics,
) (*Fetcher, error)

NewFetcher creates a new Fetcher component.

Parameters:

  • log: Logger for the component
  • blockProcessor: BlockProcessor implementation for processing collection indexing jobs
  • progressInitializer: Initializer for tracking processed block heights
  • state: Protocol state for reading finalized block information
  • blocks: Blocks storage for reading blocks by height
  • maxProcessing: Maximum number of jobs to process concurrently
  • maxSearchAhead: Maximum number of jobs beyond processedIndex to process. 0 means no limit
  • metrics: Optional metrics collector for reporting collection fetched height

No error returns are expected during normal operation.

func (*Fetcher) OnFinalizedBlock

func (s *Fetcher) OnFinalizedBlock()

OnFinalizedBlock is called when a new block is finalized. It notifies the job consumer that new work is available.

func (*Fetcher) ProcessedHeight

func (s *Fetcher) ProcessedHeight() uint64

LastProcessedIndex returns the last processed job index.

func (*Fetcher) Size

func (s *Fetcher) Size() uint

Size returns the number of in-memory jobs that the consumer is processing. Optional methods, not required for operation but useful for monitoring.

type MissingCollectionQueue

type MissingCollectionQueue struct {
	// contains filtered or unexported fields
}

MissingCollectionQueue helps the job processor to keep track of the jobs and their callbacks. Note, it DOES NOT index collections directly, instead, it only keeps track of which collections are missing for each block height, and when all collections for a block height have been received, it returns the collections to the caller for processing (storing and indexing). And let the caller to notify the completion of the processing, so that it can mark the job as done by calling the callback. This allows the MissingCollectionQueue to be decoupled from the actual processing of the collections, keep all states in memory and allow the different callers to hold the lock less time and reduce contention.

The caller is responsible for checking if collections are already in storage before enqueueing them. Only collections that are actually missing should be passed to EnqueueMissingCollections.

MissingCollectionQueue is safe for concurrent use.

func NewMissingCollectionQueue

func NewMissingCollectionQueue() *MissingCollectionQueue

NewMissingCollectionQueue creates a new MissingCollectionQueue.

No error returns are expected during normal operation.

func (*MissingCollectionQueue) EnqueueMissingCollections

func (mcq *MissingCollectionQueue) EnqueueMissingCollections(
	blockHeight uint64,
	collectionIDs []flow.Identifier,
	callback func(),
) error

EnqueueMissingCollections registers missing collections for a block height along with a callback that will be invoked when all collections for that height have been received and indexed.

The caller is responsible for checking if collections are already in storage before calling this method. Only collections that are actually missing should be passed in collectionIDs.

Returns an error if the block height is already enqueued to prevent overwriting existing jobs.

func (*MissingCollectionQueue) IsHeightQueued

func (mcq *MissingCollectionQueue) IsHeightQueued(height uint64) bool

IsHeightQueued returns true if the given height has queued collections Returns false if the height is not tracked

func (*MissingCollectionQueue) OnIndexedForBlock

func (mcq *MissingCollectionQueue) OnIndexedForBlock(blockHeight uint64) (func(), bool)

OnIndexedForBlock notifies the queue that a block height has been indexed, removes that block height from tracking, and return the callback for caller to invoke.

Returns: (callback, true) if the height existed and was processed; (nil, false) if the height was not tracked.

Note, caller should invoke the returned callback if not nil.

Behavior: OnIndexedForBlock can return the callback even before the block is complete (i.e., before all collections have been received). This allows the caller to index a block with partial collections if needed. After indexing:

  • The block is removed from tracking (IsHeightQueued returns false)
  • All collection-to-height mappings for this block are cleaned up
  • Any remaining missing collections are removed from tracking
  • Subsequent OnReceivedCollection calls for collections belonging to this block will return (nil, 0, false) because the block has been removed

func (*MissingCollectionQueue) OnReceivedCollection

func (mcq *MissingCollectionQueue) OnReceivedCollection(
	collection *flow.Collection,
) ([]*flow.Collection, uint64, bool)

OnReceivedCollection notifies the queue that a collection has been received. It checks if the block height is now complete and returns the collections and height.

The collection parameter should be the actual collection object received from the requester.

Returns:

  • (collections, height, true) if the block height became complete
  • (nil, 0, false) if no block height became complete

func (*MissingCollectionQueue) PruneUpToHeight

func (mcq *MissingCollectionQueue) PruneUpToHeight(height uint64) []func()

PruneUpToHeight removes all block jobs and their collection mappings for block heights less than or equal to the specified height.

func (*MissingCollectionQueue) Size

func (mcq *MissingCollectionQueue) Size() uint

Size returns the number of missing collections currently in the queue. This is the total number of collections across all block heights that are still missing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL