ingestion2

package
v0.43.4-access-ingesti... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2025 License: AGPL-3.0 Imports: 17 Imported by: 0

Documentation

Overview

Package ingestion2 implements a modular ingestion engine responsible for orchestrating the processing of finalized blockchain data and receiving execution receipts from the network.

The Engine coordinates several internal workers, each dedicated to a specific task:

  • Receiving and persisting execution receipts from the network.
  • Subscribing to finalized block events.
  • Synchronizing collections associated with finalized blocks.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Engine

type Engine struct {
	*component.ComponentManager
	// contains filtered or unexported fields
}

func New

func New(
	log zerolog.Logger,
	net network.EngineRegistry,
	finalizedBlockProcessor *FinalizedBlockProcessor,
	collectionSyncer *collections.Syncer,
	receipts storage.ExecutionReceipts,
	collectionExecutedMetric module.CollectionExecutedMetric,
) (*Engine, error)

func (*Engine) OnFinalizedBlock

func (e *Engine) OnFinalizedBlock(_ *model.Block)

OnFinalizedBlock is called by the follower engine after a block has been finalized and the state has been updated. Receives block finalized events from the finalization distributor and forwards them to the consumer.

func (*Engine) Process

func (e *Engine) Process(chanName channels.Channel, originID flow.Identifier, event interface{}) error

Process processes the given event from the node with the given origin ID in a blocking manner. It returns the potential processing error when done.

No errors are expected during normal operations.

type FinalizedBlockProcessor

type FinalizedBlockProcessor struct {
	// contains filtered or unexported fields
}

FinalizedBlockProcessor handles processing of finalized blocks, including indexing and syncing of related collections and execution results.

FinalizedBlockProcessor is designed to handle the ingestion of finalized Flow blocks in a scalable and decoupled manner. It uses a jobqueue.ComponentConsumer to consume and process finalized block jobs asynchronously. This design enables the processor to handle high-throughput block finalization events without blocking other parts of the system.

The processor relies on a notifier (engine.Notifier) to signal when a new finalized block is available, which triggers the job consumer to process it. The actual processing involves indexing block-to-collection and block-to-execution-result mappings, as well as requesting the associated collections.

func NewFinalizedBlockProcessor

func NewFinalizedBlockProcessor(
	log zerolog.Logger,
	state protocol.State,
	blocks storage.Blocks,
	executionResults storage.ExecutionResults,
	finalizedProcessedHeight storage.ConsumerProgressInitializer,
	syncer *collections.Syncer,
	collectionExecutedMetric module.CollectionExecutedMetric,
) (*FinalizedBlockProcessor, error)

NewFinalizedBlockProcessor creates and initializes a new FinalizedBlockProcessor, setting up job consumer infrastructure to handle finalized block processing.

No errors are expected during normal operations.

func (*FinalizedBlockProcessor) Notify

func (p *FinalizedBlockProcessor) Notify()

Notify notifies the processor that a new finalized block is available for processing.

func (*FinalizedBlockProcessor) StartWorkerLoop

StartWorkerLoop begins processing of finalized blocks and signals readiness when initialization is complete.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL