Documentation
¶
Index ¶
- Variables
- func IndexCollection(lctx lockctx.Proof, collection *flow.Collection, ...) error
- func LedgerPayloadFixture(owner string, key string, value []byte) *ledger.Payload
- func LedgerRandomPayloadFixture(t *testing.T) *ledger.Payload
- func TrieUpdateRandomLedgerPayloadsFixture(t *testing.T) *ledger.TrieUpdate
- func TrieUpdateWithPayloadsFixture(t *testing.T, payloads []*ledger.Payload) *ledger.TrieUpdate
- func ValidateTxErrors(results []flow.LightTransactionResult, ...) error
- type CollectionExecutedMetricImpl
- func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block)
- func (c *CollectionExecutedMetricImpl) CollectionExecuted(light *flow.LightCollection)
- func (c *CollectionExecutedMetricImpl) CollectionFinalized(light *flow.LightCollection)
- func (c *CollectionExecutedMetricImpl) ExecutionReceiptReceived(r *flow.ExecutionReceipt)
- func (c *CollectionExecutedMetricImpl) UpdateLastFullBlockHeight(height uint64)
- type InMemoryIndexer
- type Indexer
- type IndexerCore
- type IndexerData
Constants ¶
This section is empty.
Variables ¶
var ErrIndexNotInitialized = errors.New("index not initialized")
ErrIndexNotInitialized is returned when the indexer is not initialized
This generally indicates that the index databases are still being initialized, and trying again later may succeed
Functions ¶
func IndexCollection ¶ added in v0.43.0
func IndexCollection( lctx lockctx.Proof, collection *flow.Collection, collections storage.Collections, logger zerolog.Logger, collectionExecutedMetric module.CollectionExecutedMetric, ) error
IndexCollection handles the response of the collection request made earlier when a block was received.
No error returns are expected during normal operations.
func LedgerPayloadFixture ¶ added in v0.43.0
LedgerPayloadFixture creates a ledger payload with the specified owner, key, and value. It constructs a proper ledger key with owner and key parts and returns a payload suitable for testing ledger operations.
func LedgerRandomPayloadFixture ¶ added in v0.43.0
LedgerRandomPayloadFixture creates a single test payload with a random owner, key, and value for use in ledger and register testing scenarios.
func TrieUpdateRandomLedgerPayloadsFixture ¶ added in v0.43.0
func TrieUpdateRandomLedgerPayloadsFixture(t *testing.T) *ledger.TrieUpdate
TrieUpdateRandomLedgerPayloadsFixture creates a test trie update with multiple test payloads for use in testing register persistence functionality.
func TrieUpdateWithPayloadsFixture ¶ added in v0.43.0
TrieUpdateWithPayloadsFixture creates a trie update from the provided payloads. It extracts keys and values from payloads and constructs a proper ledger update and trie update structure for testing purposes.
func ValidateTxErrors ¶
func ValidateTxErrors(results []flow.LightTransactionResult, txResultErrMsgs []flow.TransactionResultErrorMessage) error
ValidateTxErrors validates that the transaction results and error messages are consistent, and returns an error if they are not.
All error returns are benign and side-effect free for the node. They indicate that the transaction results and error messages are inconsistent, which points to invalid data produced by an external node.
Types ¶
type CollectionExecutedMetricImpl ¶ added in v0.33.15
type CollectionExecutedMetricImpl struct {
// contains filtered or unexported fields
}
CollectionExecutedMetricImpl tracks metrics to measure how long it takes for tx to reach each step in their lifecycle
func NewCollectionExecutedMetricImpl ¶ added in v0.33.15
func NewCollectionExecutedMetricImpl( log zerolog.Logger, accessMetrics module.AccessMetrics, collectionsToMarkFinalized *stdmap.Times, collectionsToMarkExecuted *stdmap.Times, blocksToMarkExecuted *stdmap.Times, collections storage.Collections, blocks storage.Blocks, blockTransactions *stdmap.IdentifierMap, ) (*CollectionExecutedMetricImpl, error)
func (*CollectionExecutedMetricImpl) BlockFinalized ¶ added in v0.33.15
func (c *CollectionExecutedMetricImpl) BlockFinalized(block *flow.Block)
BlockFinalized tracks finalized metric for block
func (*CollectionExecutedMetricImpl) CollectionExecuted ¶ added in v0.33.15
func (c *CollectionExecutedMetricImpl) CollectionExecuted(light *flow.LightCollection)
CollectionExecuted tracks collections to mark executed
func (*CollectionExecutedMetricImpl) CollectionFinalized ¶ added in v0.33.15
func (c *CollectionExecutedMetricImpl) CollectionFinalized(light *flow.LightCollection)
CollectionFinalized tracks collections to mark finalized
func (*CollectionExecutedMetricImpl) ExecutionReceiptReceived ¶ added in v0.33.15
func (c *CollectionExecutedMetricImpl) ExecutionReceiptReceived(r *flow.ExecutionReceipt)
ExecutionReceiptReceived tracks execution receipt metrics
func (*CollectionExecutedMetricImpl) UpdateLastFullBlockHeight ¶ added in v0.33.15
func (c *CollectionExecutedMetricImpl) UpdateLastFullBlockHeight(height uint64)
type InMemoryIndexer ¶ added in v0.43.0
type InMemoryIndexer struct {
// contains filtered or unexported fields
}
<component_spec> InMemoryIndexer indexes block execution data for a single ExecutionResult into a mempool. It is designed to be used as part of the optimistic syncing processing pipeline, to index data for unsealed execution results which is eventually persisted when the execution result is sealed.
The data contained within the BlockExecutionData is verified by verifications nodes as part of the approval process. Once the execution result is sealed, the Access node can accept it as valid without further verification. However, with optimistic syncing, the Access node may index data for execution results that are not sealed. Since this data is not certified by the protocol, it must not be persisted to disk. It may be used by the Access node to serve Access API requests, with the understanding that it may be later determined to be invalid.
The provided BlockExecutionData is received over the network and its hash is compared to the value included in an ExecutionResult within a certified block. This guarantees it is the same data that was produced by an execution node whose stake is at risk if the data is incorrect. It is not practical for an Access node to verify all data, but the indexer may perform opportunistic checks to ensure the data is generally consistent.
Transaction error messages are received directly from execution nodes with no protocol guarantees. The node must validate that there is a one-to-one mapping between failed transactions and transaction error messages. Since the error messages are requested directly from the execution nodes, it's possible that they are delayed. To avoid blocking the indexing process if ENs are unresponsive, the processing pipeline may skip the call to `ValidateTxErrors()` if the error messages are not ready. In this case, the the error messages may be validated and backfilled later. </component_spec>
Safe for concurrent use.
func NewInMemoryIndexer ¶ added in v0.43.0
func NewInMemoryIndexer( log zerolog.Logger, block *flow.Block, executionResult *flow.ExecutionResult, ) (*InMemoryIndexer, error)
NewInMemoryIndexer returns a new indexer that indexes block execution data and error messages for a single ExecutionResult.
No error returns are expected during normal operations.
func (*InMemoryIndexer) IndexBlockData ¶ added in v0.43.0
func (i *InMemoryIndexer) IndexBlockData(data *execution_data.BlockExecutionData) (*IndexerData, error)
IndexBlockData indexes all execution block data.
The method is idempotent and does not modify the state of the indexer.
All error returns are benign and side-effect free for the node. They indicate that the BlockExecutionData is inconsistent with the execution result and its block, which points to invalid data produced by an external node.
type Indexer ¶
type Indexer struct { component.Component execution_data.ProcessedHeightRecorder // contains filtered or unexported fields }
Indexer handles ingestion of new execution data available and uses the execution data indexer module to index the data. The processing of new available data is done by creating a jobqueue that uses the execution data reader to obtain new jobs. The worker also implements the `highestConsecutiveHeight` method which is used by the execution data reader, so it doesn't surpass the highest sealed block height when fetching the data. The execution state worker has a callback that is used by the upstream queues which download new execution data to notify new data is available and kick off indexing.
func NewIndexer ¶
func NewIndexer( log zerolog.Logger, initHeight uint64, registers storage.RegisterIndex, indexer *IndexerCore, executionCache *cache.ExecutionDataCache, executionDataLatestHeight func() (uint64, error), processedHeightInitializer storage.ConsumerProgressInitializer, ) (*Indexer, error)
NewIndexer creates a new execution worker.
func (*Indexer) HighestIndexedHeight ¶
HighestIndexedHeight returns the highest height indexed by the execution indexer.
func (*Indexer) LowestIndexedHeight ¶
LowestIndexedHeight returns the lowest height indexed by the execution indexer.
func (*Indexer) OnExecutionData ¶
func (i *Indexer) OnExecutionData(_ *execution_data.BlockExecutionDataEntity)
OnExecutionData is used to notify when new execution data is downloaded by the execution data requester jobqueue.
func (*Indexer) Start ¶
func (i *Indexer) Start(ctx irrecoverable.SignalerContext)
Start the worker jobqueue to consume the available data.
type IndexerCore ¶
type IndexerCore struct {
// contains filtered or unexported fields
}
IndexerCore indexes the execution state.
func New ¶
func New( log zerolog.Logger, metrics module.ExecutionStateIndexerMetrics, protocolDB storage.DB, registers storage.RegisterIndex, headers storage.Headers, events storage.Events, collections storage.Collections, transactions storage.Transactions, results storage.LightTransactionResults, scheduledTransactions storage.ScheduledTransactions, chainID flow.ChainID, derivedChainData *derived.DerivedChainData, collectionExecutedMetric module.CollectionExecutedMetric, lockManager lockctx.Manager, ) *IndexerCore
New execution state indexer used to ingest block execution data and index it by height. The passed RegisterIndex storage must be populated to include the first and last height otherwise the indexer won't be initialized to ensure we have bootstrapped the storage first.
func (*IndexerCore) IndexBlockData ¶
func (c *IndexerCore) IndexBlockData(data *execution_data.BlockExecutionDataEntity) error
IndexBlockData indexes all execution block data by height. This method shouldn't be used concurrently. Expected error returns during normal operations: - storage.ErrNotFound if the block for execution data was not found
func (*IndexerCore) RegisterValue ¶ added in v0.32.7
func (c *IndexerCore) RegisterValue(ID flow.RegisterID, height uint64) (flow.RegisterValue, error)
RegisterValue retrieves register values by the register IDs at the provided block height. Even if the register wasn't indexed at the provided height, returns the highest height the register was indexed at. If a register is not found it will return a nil value and not an error.
Expected error returns during normal operation:
- storage.ErrHeightNotIndexed: if the given height was not indexed yet or lower than the first indexed height.
type IndexerData ¶
type IndexerData struct { Events []flow.Event Collections []*flow.Collection Transactions []*flow.TransactionBody Results []flow.LightTransactionResult Registers []flow.RegisterEntry }
IndexerData is the collection of data ingested by the indexer.