ingestion2

package
v0.42.4-pebble.4-fix-a... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2025 License: AGPL-3.0 Imports: 21 Imported by: 0

Documentation

Overview

Package ingestion2 implements a modular ingestion engine responsible for orchestrating the processing of finalized blockchain data and receiving execution receipts from the network.

The Engine coordinates several internal workers, each dedicated to a specific task:

  • Receiving and persisting execution receipts from the network.
  • Subscribing to finalized block events.
  • Synchronizing collections associated with finalized blocks.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CollectionSyncer

type CollectionSyncer struct {
	// contains filtered or unexported fields
}

The CollectionSyncer type provides mechanisms for syncing and indexing data from the Flow blockchain into local storage. Specifically, it handles the retrieval and processing of collections and transactions that may have been missed due to network delays, restarts, or gaps in finalization.

It is responsible for ensuring the local node has all collections associated with finalized blocks starting from the last fully synced height. It works by periodically scanning the finalized block range, identifying missing collections, and triggering requests to fetch them from the network. Once collections are retrieved, it ensures they are persisted in the local collection and transaction stores.

The syncer maintains a persistent, strictly monotonic counter (`lastFullBlockHeight`) to track the highest finalized block for which all collections have been fully indexed. It uses this information to avoid redundant processing and to measure catch-up progress.

It is meant to operate in a background goroutine as part of the node's ingestion pipeline.

func NewCollectionSyncer

func NewCollectionSyncer(
	logger zerolog.Logger,
	collectionExecutedMetric module.CollectionExecutedMetric,
	requester module.Requester,
	state protocol.State,
	blocks storage.Blocks,
	collections storage.Collections,
	transactions storage.Transactions,
	lastFullBlockHeight *counters.PersistentStrictMonotonicCounter,
	lockManager storage.LockManager,
) (*CollectionSyncer, error)

NewCollectionSyncer creates a new CollectionSyncer responsible for requesting, tracking, and indexing missing collections.

func (*CollectionSyncer) OnCollectionDownloaded

func (s *CollectionSyncer) OnCollectionDownloaded(id flow.Identifier, entity flow.Entity)

OnCollectionDownloaded indexes and persists a downloaded collection. This function is a callback intended to be used by the requester engine.

func (*CollectionSyncer) RequestCollectionsForBlock

func (s *CollectionSyncer) RequestCollectionsForBlock(height uint64, missingCollections []*flow.CollectionGuarantee)

RequestCollectionsForBlock conditionally requests missing collections for a specific block height, skipping requests if the block is already below the known full block height.

func (*CollectionSyncer) StartWorkerLoop

func (s *CollectionSyncer) StartWorkerLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc)

StartWorkerLoop continuously monitors and triggers collection sync operations. It handles on startup collection catchup, periodic missing collection requests, and full block height updates.

type Engine

type Engine struct {
	*component.ComponentManager
	// contains filtered or unexported fields
}

func New

func New(
	log zerolog.Logger,
	net network.EngineRegistry,
	finalizedBlockProcessor *FinalizedBlockProcessor,
	collectionSyncer *CollectionSyncer,
	receipts storage.ExecutionReceipts,
	collectionExecutedMetric module.CollectionExecutedMetric,
) (*Engine, error)

func (*Engine) OnFinalizedBlock

func (e *Engine) OnFinalizedBlock(_ *model.Block)

OnFinalizedBlock is called by the follower engine after a block has been finalized and the state has been updated. Receives block finalized events from the finalization distributor and forwards them to the consumer.

func (*Engine) Process

func (e *Engine) Process(chanName channels.Channel, originID flow.Identifier, event interface{}) error

Process processes the given event from the node with the given origin ID in a blocking manner. It returns the potential processing error when done.

No errors are expected during normal operations.

type FinalizedBlockProcessor

type FinalizedBlockProcessor struct {
	// contains filtered or unexported fields
}

FinalizedBlockProcessor handles processing of finalized blocks, including indexing and syncing of related collections and execution results.

FinalizedBlockProcessor is designed to handle the ingestion of finalized Flow blocks in a scalable and decoupled manner. It uses a jobqueue.ComponentConsumer to consume and process finalized block jobs asynchronously. This design enables the processor to handle high-throughput block finalization events without blocking other parts of the system.

The processor relies on a notifier (engine.Notifier) to signal when a new finalized block is available, which triggers the job consumer to process it. The actual processing involves indexing block-to-collection and block-to-execution-result mappings, as well as requesting the associated collections.

func NewFinalizedBlockProcessor

func NewFinalizedBlockProcessor(
	log zerolog.Logger,
	state protocol.State,
	blocks storage.Blocks,
	executionResults storage.ExecutionResults,
	finalizedProcessedHeight storage.ConsumerProgressInitializer,
	syncer *CollectionSyncer,
	collectionExecutedMetric module.CollectionExecutedMetric,
) (*FinalizedBlockProcessor, error)

NewFinalizedBlockProcessor creates and initializes a new FinalizedBlockProcessor, setting up job consumer infrastructure to handle finalized block processing.

No errors are expected during normal operations.

func (*FinalizedBlockProcessor) Notify

func (p *FinalizedBlockProcessor) Notify()

Notify notifies the processor that a new finalized block is available for processing.

func (*FinalizedBlockProcessor) StartWorkerLoop

StartWorkerLoop begins processing of finalized blocks and signals readiness when initialization is complete.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL