store

package
v0.42.1-unsafe-followe... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 5, 2025 License: AGPL-3.0 Imports: 16 Imported by: 1

Documentation

Index

Constants

View Source
const DefaultCacheSize = uint(1000)
View Source
const DefaultChunkQueuesCacheSize = uint(1000)
View Source
const JobQueueChunksQueue = "JobQueueChunksQueue"

Variables

View Source
var DefaultEpochProtocolStateCacheSize uint = 20

DefaultEpochProtocolStateCacheSize is the default size for primary epoch protocol state entry cache. Minimally, we have 3 entries per epoch (one on epoch Switchover, one on receiving the Epoch Setup and one when seeing the Epoch Commit event). Let's be generous and assume we have 20 different epoch state entries per epoch.

View Source
var DefaultProtocolKVStoreByBlockIDCacheSize uint = 1000

DefaultProtocolKVStoreByBlockIDCacheSize is the default value for secondary index `byBlockIdCache`. We want to be able to cover a broad interval of views without cache misses, so we use a bigger value. Generally, many blocks will reference the same KV store snapshot.

View Source
var DefaultProtocolKVStoreCacheSize uint = 10

DefaultProtocolKVStoreCacheSize is the default size for primary protocol KV store cache. KV store is rarely updated, so we will have a limited number of unique snapshots. Let's be generous and assume we have 10 different KV stores used at the same time.

View Source
var DefaultProtocolStateIndexCacheSize uint = 1000

DefaultProtocolStateIndexCacheSize is the default value for secondary byBlockIdCache. We want to be able to cover a broad interval of views without cache misses, so we use a bigger value.

Functions

func KeyToBlockIDIndex added in v0.39.4

func KeyToBlockIDIndex(key IdentifierAndUint32) (flow.Identifier, uint32)

func KeyToBlockIDTransactionID added in v0.39.4

func KeyToBlockIDTransactionID(key TwoIdentifier) (flow.Identifier, flow.Identifier)

Types

type All added in v0.43.0

type All struct {
	Headers            *Headers
	Guarantees         *Guarantees
	Seals              *Seals
	Index              *Index
	Payloads           *Payloads
	Blocks             *Blocks
	QuorumCertificates *QuorumCertificates
	Results            *ExecutionResults
	Receipts           *ExecutionReceipts
	Commits            *Commits

	Setups                    *EpochSetups
	EpochCommits              *EpochCommits
	EpochProtocolStateEntries *EpochProtocolStateEntries
	ProtocolKVStore           *ProtocolKVStore
	VersionBeacons            *VersionBeacons
	Transactions              *Transactions
	Collections               *Collections
}

func InitAll added in v0.43.0

func InitAll(metrics module.CacheMetrics, db storage.DB) *All

type Blocks added in v0.43.0

type Blocks struct {
	// contains filtered or unexported fields
}

Blocks implements a simple block storage around a badger DB.

func NewBlocks added in v0.43.0

func NewBlocks(db storage.DB, headers *Headers, payloads *Payloads) *Blocks

NewBlocks ...

func (*Blocks) BatchStore added in v0.43.0

func (b *Blocks) BatchStore(lctx lockctx.Proof, rw storage.ReaderBatchWriter, block *flow.Block) error

BatchStore stores a valid block in a batch.

func (*Blocks) ByCollectionID added in v0.43.0

func (b *Blocks) ByCollectionID(collID flow.Identifier) (*flow.Block, error)

ByCollectionID returns the block for the given collection ID.

func (*Blocks) ByHeight added in v0.43.0

func (b *Blocks) ByHeight(height uint64) (*flow.Block, error)

ByHeight returns the block at the given height. It is only available for finalized blocks.

Expected errors during normal operations: - storage.ErrNotFound if no block is found for the given height

func (*Blocks) ByID added in v0.43.0

func (b *Blocks) ByID(blockID flow.Identifier) (*flow.Block, error)

ByID returns the block with the given hash. It is available for finalized and ambiguous blocks. Expected errors during normal operations: - storage.ErrNotFound if no block is found

func (*Blocks) IndexBlockForCollections

func (b *Blocks) IndexBlockForCollections(blockID flow.Identifier, collIDs []flow.Identifier) error

IndexBlockForCollections indexes the block each collection was included in. This should not be called when finalizing a block

func (*Blocks) StoreTx

func (b *Blocks) StoreTx(block *flow.Block) func(*transaction.Tx) error

TODO: to be removed

type Cache

type Cache[K comparable, V any] struct {
	// contains filtered or unexported fields
}

func (*Cache[K, V]) Get

func (c *Cache[K, V]) Get(r storage.Reader, key K) (V, error)

Get will try to retrieve the resource from cache first, and then from the injected. During normal operations, the following error returns are expected:

  • `storage.ErrNotFound` if key is unknown.

func (*Cache[K, V]) Insert

func (c *Cache[K, V]) Insert(key K, resource V)

Insert will add a resource directly to the cache with the given ID

func (*Cache[K, V]) IsCached

func (c *Cache[K, V]) IsCached(key K) bool

IsCached returns true if the key exists in the cache. It DOES NOT check whether the key exists in the underlying data store.

func (*Cache[K, V]) PutTx

func (c *Cache[K, V]) PutTx(rw storage.ReaderBatchWriter, key K, resource V) error

PutTx will return tx which adds a resource to the cache with the given ID.

func (*Cache[K, V]) PutWithLockTx added in v0.43.0

func (c *Cache[K, V]) PutWithLockTx(lctx lockctx.Proof, rw storage.ReaderBatchWriter, key K, resource V) error

func (*Cache[K, V]) Remove

func (c *Cache[K, V]) Remove(key K)

func (*Cache[K, V]) RemoveTx added in v0.41.0

func (c *Cache[K, V]) RemoveTx(rw storage.ReaderBatchWriter, key K) error

type ChunkDataPacks

type ChunkDataPacks struct {
	// contains filtered or unexported fields
}

func NewChunkDataPacks

func NewChunkDataPacks(collector module.CacheMetrics, db storage.DB, collections storage.Collections, byChunkIDCacheSize uint) *ChunkDataPacks

func (*ChunkDataPacks) BatchRemove

func (ch *ChunkDataPacks) BatchRemove(chunkID flow.Identifier, rw storage.ReaderBatchWriter) error

BatchRemove removes ChunkDataPack c keyed by its ChunkID in provided batch No errors are expected during normal operation, even if no entries are matched. If Badger unexpectedly fails to process the request, the error is wrapped in a generic error and returned.

func (*ChunkDataPacks) BatchStore

BatchStore stores ChunkDataPack c keyed by its ChunkID in provided batch. No errors are expected during normal operation, but it may return generic error if entity is not serializable or Badger unexpectedly fails to process request

func (*ChunkDataPacks) ByChunkID

func (ch *ChunkDataPacks) ByChunkID(chunkID flow.Identifier) (*flow.ChunkDataPack, error)

func (*ChunkDataPacks) Remove

func (ch *ChunkDataPacks) Remove(chunkIDs []flow.Identifier) error

Remove removes multiple ChunkDataPacks cs keyed by their ChunkIDs in a batch. No errors are expected during normal operation, even if no entries are matched.

func (*ChunkDataPacks) Store

func (ch *ChunkDataPacks) Store(cs []*flow.ChunkDataPack) error

Store stores multiple ChunkDataPacks cs keyed by their ChunkIDs in a batch. No errors are expected during normal operation, but it may return generic error

type ChunksQueue added in v0.39.1

type ChunksQueue struct {
	// contains filtered or unexported fields
}

ChunksQueue stores a queue of chunk locators that assigned to me to verify. Job consumers can read the locators as job from the queue by index. Chunk locators stored in this queue are unique.

func NewChunkQueue added in v0.39.1

func NewChunkQueue(collector module.CacheMetrics, db storage.DB) *ChunksQueue

NewChunkQueue will initialize the underlying badger database of chunk locator queue.

func (*ChunksQueue) AtIndex added in v0.39.1

func (q *ChunksQueue) AtIndex(index uint64) (*chunks.Locator, error)

AtIndex returns the chunk locator stored at the given index in the queue.

func (*ChunksQueue) Init added in v0.39.1

func (q *ChunksQueue) Init(defaultIndex uint64) (bool, error)

Init initializes chunk queue's latest index with the given default index. It returns (false, nil) if the chunk queue is already initialized. It returns (true, nil) if the chunk queue is successfully initialized.

func (*ChunksQueue) LatestIndex added in v0.39.1

func (q *ChunksQueue) LatestIndex() (uint64, error)

LatestIndex returns the index of the latest chunk locator stored in the queue.

func (*ChunksQueue) StoreChunkLocator added in v0.39.1

func (q *ChunksQueue) StoreChunkLocator(locator *chunks.Locator) (bool, error)

StoreChunkLocator stores a new chunk locator that assigned to me to the job queue. A true will be returned, if the locator was new. A false will be returned, if the locator was duplicate.

type Collections added in v0.39.4

type Collections struct {
	// contains filtered or unexported fields
}

func NewCollections added in v0.39.4

func NewCollections(db storage.DB, transactions *Transactions) *Collections

func (*Collections) BatchStoreLightAndIndexByTransaction

func (c *Collections) BatchStoreLightAndIndexByTransaction(collection *flow.LightCollection, batch storage.ReaderBatchWriter) error

BatchStoreLightAndIndexByTransaction stores a light collection and indexes it by transaction ID within a batch operation. No errors are expected during normal operations

func (*Collections) ByID added in v0.39.4

func (c *Collections) ByID(colID flow.Identifier) (*flow.Collection, error)

ByID retrieves a collection by its ID.

func (*Collections) LightByID added in v0.39.4

func (c *Collections) LightByID(colID flow.Identifier) (*flow.LightCollection, error)

LightByID retrieves a light collection by its ID.

func (*Collections) LightByTransactionID added in v0.39.4

func (c *Collections) LightByTransactionID(txID flow.Identifier) (*flow.LightCollection, error)

LightByTransactionID retrieves a light collection by a transaction ID.

func (*Collections) Remove added in v0.39.4

func (c *Collections) Remove(colID flow.Identifier) error

Remove removes a collection from the database, including all constituent transactions and indices inserted by Store. Remove does not error if the collection does not exist Note: this method should only be called for collections included in blocks below sealed height No errors are expected during normal operation.

func (*Collections) Store added in v0.39.4

func (c *Collections) Store(collection *flow.Collection) error

Store stores a collection in the database. any error returned are exceptions

func (*Collections) StoreLightAndIndexByTransaction added in v0.39.4

func (c *Collections) StoreLightAndIndexByTransaction(collection *flow.LightCollection) error

StoreLightAndIndexByTransaction stores a light collection and indexes it by transaction ID. It's concurrent-safe. any error returned are exceptions

type Commits added in v0.39.4

type Commits struct {
	// contains filtered or unexported fields
}

func NewCommits added in v0.39.4

func NewCommits(collector module.CacheMetrics, db storage.DB) *Commits

func (*Commits) BatchRemoveByBlockID added in v0.39.4

func (c *Commits) BatchRemoveByBlockID(blockID flow.Identifier, rw storage.ReaderBatchWriter) error

BatchRemoveByBlockID removes Commit keyed by blockID in provided batch No errors are expected during normal operation, even if no entries are matched. If Badger unexpectedly fails to process the request, the error is wrapped in a generic error and returned.

func (*Commits) BatchStore added in v0.39.4

func (c *Commits) BatchStore(blockID flow.Identifier, commit flow.StateCommitment, rw storage.ReaderBatchWriter) error

BatchStore stores Commit keyed by blockID in provided batch No errors are expected during normal operation, even if no entries are matched. If Badger unexpectedly fails to process the request, the error is wrapped in a generic error and returned.

func (*Commits) ByBlockID added in v0.39.4

func (c *Commits) ByBlockID(blockID flow.Identifier) (flow.StateCommitment, error)

func (*Commits) RemoveByBlockID added in v0.39.4

func (c *Commits) RemoveByBlockID(blockID flow.Identifier) error

func (*Commits) Store added in v0.39.4

func (c *Commits) Store(blockID flow.Identifier, commit flow.StateCommitment) error

type ComputationResultUploadStatus added in v0.39.2

type ComputationResultUploadStatus struct {
	// contains filtered or unexported fields
}

func NewComputationResultUploadStatus added in v0.39.2

func NewComputationResultUploadStatus(db storage.DB) *ComputationResultUploadStatus

func (*ComputationResultUploadStatus) ByID added in v0.39.2

func (c *ComputationResultUploadStatus) ByID(computationResultID flow.Identifier) (bool, error)

func (*ComputationResultUploadStatus) GetIDsByUploadStatus added in v0.39.2

func (c *ComputationResultUploadStatus) GetIDsByUploadStatus(targetUploadStatus bool) ([]flow.Identifier, error)

func (*ComputationResultUploadStatus) Remove added in v0.39.2

func (c *ComputationResultUploadStatus) Remove(computationResultID flow.Identifier) error

func (*ComputationResultUploadStatus) Upsert added in v0.39.2

func (c *ComputationResultUploadStatus) Upsert(blockID flow.Identifier,
	wasUploadCompleted bool) error

type ConsumerProgressInitializer

type ConsumerProgressInitializer struct {
	// contains filtered or unexported fields
}

ConsumerProgressInitializer is a helper to initialize the consumer progress index in storage It prevents the consumer from being used before initialization

func NewConsumerProgress

func NewConsumerProgress(db storage.DB, consumer string) *ConsumerProgressInitializer

func (*ConsumerProgressInitializer) Initialize

func (cpi *ConsumerProgressInitializer) Initialize(defaultIndex uint64) (storage.ConsumerProgress, error)

type EpochCommits added in v0.43.0

type EpochCommits struct {
	// contains filtered or unexported fields
}

func NewEpochCommits added in v0.43.0

func NewEpochCommits(collector module.CacheMetrics, db storage.DB) *EpochCommits

func (*EpochCommits) BatchStore added in v0.43.0

func (ec *EpochCommits) BatchStore(rw storage.ReaderBatchWriter, commit *flow.EpochCommit) error

func (*EpochCommits) ByID added in v0.43.0

func (ec *EpochCommits) ByID(commitID flow.Identifier) (*flow.EpochCommit, error)

ByID will return the EpochCommit event by its ID. Error returns: * storage.ErrNotFound if no EpochCommit with the ID exists

type EpochProtocolStateEntries added in v0.43.0

type EpochProtocolStateEntries struct {
	// contains filtered or unexported fields
}

EpochProtocolStateEntries implements a persistent, fork-aware storage for the Epoch-related sub-states of the overall of the overall Protocol State (KV Store). It uses an embedded cache which is populated on first retrieval to speed up access to frequently used epoch sub-state.

func NewEpochProtocolStateEntries added in v0.43.0

func NewEpochProtocolStateEntries(collector module.CacheMetrics,
	epochSetups storage.EpochSetups,
	epochCommits storage.EpochCommits,
	db storage.DB,
	stateCacheSize uint,
	stateByBlockIDCacheSize uint,
) *EpochProtocolStateEntries

NewEpochProtocolStateEntries creates a EpochProtocolStateEntries instance, which stores a subset of the state stored by the Dynamic Protocol State. It supports storing, caching and retrieving by ID or the additionally indexed block ID.

func (*EpochProtocolStateEntries) BatchIndex added in v0.43.0

func (s *EpochProtocolStateEntries) BatchIndex(rw storage.ReaderBatchWriter, blockID flow.Identifier, epochProtocolStateEntryID flow.Identifier) error

BatchIndex persists the specific map entry in the node's database. In a nutshell, we want to maintain a map from `blockID` to `epochStateEntry`, where `blockID` references the block that _proposes_ the referenced epoch protocol state entry. Protocol convention:

  • Consider block B, whose ingestion might potentially lead to an updated protocol state. For example, the protocol state changes if we seal some execution results emitting service events.
  • For the key `blockID`, we use the identity of block B which _proposes_ this Protocol State. As value, the hash of the resulting protocol state at the end of processing B is to be used.
  • CAUTION: The protocol state requires confirmation by a QC and will only become active at the child block, _after_ validating the QC.

No errors are expected during normal operation.

func (*EpochProtocolStateEntries) BatchStore added in v0.43.0

func (s *EpochProtocolStateEntries) BatchStore(
	w storage.Writer,
	epochProtocolStateEntryID flow.Identifier,
	epochStateEntry *flow.MinEpochStateEntry,
) error

BatchStore returns persists the given epoch protocol state entry as part of a DB batch. Per convention, the identities in the flow.MinEpochStateEntry must be in canonical order for the current and next epoch (if present), otherwise an exception is returned. No errors are expected during normal operation.

func (*EpochProtocolStateEntries) ByBlockID added in v0.43.0

ByBlockID retrieves the epoch protocol state entry that the block with the given ID proposes. CAUTION: this protocol state requires confirmation by a QC and will only become active at the child block, _after_ validating the QC. Protocol convention:

  • Consider block B, whose ingestion might potentially lead to an updated protocol state. For example, the protocol state changes if we seal some execution results emitting service events.
  • For the key `blockID`, we use the identity of block B which _proposes_ this Protocol State. As value, the hash of the resulting protocol state at the end of processing B is to be used.
  • CAUTION: The protocol state requires confirmation by a QC and will only become active at the child block, _after_ validating the QC.

Expected errors during normal operations:

  • storage.ErrNotFound if no state entry has been indexed for the given block.

func (*EpochProtocolStateEntries) ByID added in v0.43.0

func (s *EpochProtocolStateEntries) ByID(epochProtocolStateEntryID flow.Identifier) (*flow.RichEpochStateEntry, error)

ByID returns the epoch protocol state entry by its ID. Expected errors during normal operations:

  • storage.ErrNotFound if no protocol state with the given Identifier is known.

type EpochSetups added in v0.43.0

type EpochSetups struct {
	// contains filtered or unexported fields
}

func NewEpochSetups added in v0.43.0

func NewEpochSetups(collector module.CacheMetrics, db storage.DB) *EpochSetups

NewEpochSetups instantiates a new EpochSetups storage.

func (*EpochSetups) BatchStore added in v0.43.0

func (es *EpochSetups) BatchStore(rw storage.ReaderBatchWriter, setup *flow.EpochSetup) error

No errors are expected during normal operation.

func (*EpochSetups) ByID added in v0.43.0

func (es *EpochSetups) ByID(setupID flow.Identifier) (*flow.EpochSetup, error)

ByID will return the EpochSetup event by its ID. Error returns: * storage.ErrNotFound if no EpochSetup with the ID exists

type Events added in v0.39.4

type Events struct {
	// contains filtered or unexported fields
}

func NewEvents added in v0.39.4

func NewEvents(collector module.CacheMetrics, db storage.DB) *Events

func (*Events) BatchRemoveByBlockID added in v0.39.4

func (e *Events) BatchRemoveByBlockID(blockID flow.Identifier, rw storage.ReaderBatchWriter) error

BatchRemoveByBlockID removes events keyed by a blockID in provided batch No errors are expected during normal operation, even if no entries are matched. If Badger unexpectedly fails to process the request, the error is wrapped in a generic error and returned.

func (*Events) BatchStore added in v0.39.4

func (e *Events) BatchStore(blockID flow.Identifier, blockEvents []flow.EventsList, batch storage.ReaderBatchWriter) error

BatchStore stores events keyed by a blockID in provided batch No errors are expected during normal operation, but it may return generic error if badger fails to process request

func (*Events) ByBlockID added in v0.39.4

func (e *Events) ByBlockID(blockID flow.Identifier) ([]flow.Event, error)

ByBlockID returns the events for the given block ID Note: This method will return an empty slice and no error if no entries for the blockID are found

func (*Events) ByBlockIDEventType added in v0.39.4

func (e *Events) ByBlockIDEventType(blockID flow.Identifier, eventType flow.EventType) ([]flow.Event, error)

ByBlockIDEventType returns the events for the given block ID and event type Note: This method will return an empty slice and no error if no entries for the blockID are found

func (*Events) ByBlockIDTransactionID added in v0.39.4

func (e *Events) ByBlockIDTransactionID(blockID flow.Identifier, txID flow.Identifier) ([]flow.Event, error)

ByBlockIDTransactionID returns the events for the given block ID and transaction ID Note: This method will return an empty slice and no error if no entries for the blockID are found

func (*Events) ByBlockIDTransactionIndex added in v0.39.4

func (e *Events) ByBlockIDTransactionIndex(blockID flow.Identifier, txIndex uint32) ([]flow.Event, error)

ByBlockIDTransactionIndex returns the events for the given block ID and transaction index Note: This method will return an empty slice and no error if no entries for the blockID are found

func (*Events) RemoveByBlockID added in v0.39.4

func (e *Events) RemoveByBlockID(blockID flow.Identifier) error

RemoveByBlockID removes events by block ID

func (*Events) Store added in v0.39.4

func (e *Events) Store(blockID flow.Identifier, blockEvents []flow.EventsList) error

Store will store events for the given block ID

type ExecutionForkEvidence added in v0.42.1

type ExecutionForkEvidence struct {
	// contains filtered or unexported fields
}

ExecutionForkEvidence represents persistent storage for execution fork evidence.

func NewExecutionForkEvidence added in v0.42.1

func NewExecutionForkEvidence(db storage.DB) *ExecutionForkEvidence

NewExecutionForkEvidence creates a new ExecutionForkEvidence store.

func (*ExecutionForkEvidence) Retrieve added in v0.42.1

Retrieve reads conflicting seals from the database. No error is returned if database record doesn't exist. No errors are expected during normal operations.

func (*ExecutionForkEvidence) StoreIfNotExists added in v0.42.1

func (efe *ExecutionForkEvidence) StoreIfNotExists(conflictingSeals []*flow.IncorporatedResultSeal) error

StoreIfNotExists stores the given conflictingSeals to the database. This is a no-op if there is already a record in the database with the same key. No errors are expected during normal operations. CAUTION: This function is not safe for concurrent use by multiple goroutines. For safety, we rely on the fact that Execution Fork Evidence has a very small surface area of use in the Execution Fork Suppressor. The Execution Fork Suppressor is responsible for ensuring mutually exclusive access to this function.

type ExecutionReceipts added in v0.39.4

type ExecutionReceipts struct {
	// contains filtered or unexported fields
}

ExecutionReceipts implements storage for execution receipts.

func NewExecutionReceipts added in v0.39.4

func NewExecutionReceipts(collector module.CacheMetrics, db storage.DB, results storage.ExecutionResults, cacheSize uint) *ExecutionReceipts

NewExecutionReceipts Creates ExecutionReceipts instance which is a database of receipts which supports storing and indexing receipts by receipt ID and block ID.

func (*ExecutionReceipts) BatchStore added in v0.39.4

func (*ExecutionReceipts) ByBlockID added in v0.39.4

ByBlockID retrieves list of execution receipts from the storage

No errors are expected errors during normal operations.

func (*ExecutionReceipts) ByID added in v0.39.4

func (*ExecutionReceipts) Store added in v0.39.4

func (r *ExecutionReceipts) Store(receipt *flow.ExecutionReceipt) error

type ExecutionResults added in v0.39.4

type ExecutionResults struct {
	// contains filtered or unexported fields
}

ExecutionResults implements persistent storage for execution results.

func NewExecutionResults added in v0.39.4

func NewExecutionResults(collector module.CacheMetrics, db storage.DB) *ExecutionResults

func (*ExecutionResults) BatchIndex added in v0.39.4

func (r *ExecutionResults) BatchIndex(blockID flow.Identifier, resultID flow.Identifier, batch storage.ReaderBatchWriter) error

func (*ExecutionResults) BatchRemoveIndexByBlockID added in v0.39.4

func (r *ExecutionResults) BatchRemoveIndexByBlockID(blockID flow.Identifier, batch storage.ReaderBatchWriter) error

BatchRemoveIndexByBlockID removes blockID-to-executionResultID index entries keyed by blockID in a provided batch. No errors are expected during normal operation, even if no entries are matched. If Badger unexpectedly fails to process the request, the error is wrapped in a generic error and returned.

func (*ExecutionResults) BatchStore added in v0.39.4

func (r *ExecutionResults) BatchStore(result *flow.ExecutionResult, batch storage.ReaderBatchWriter) error

func (*ExecutionResults) ByBlockID added in v0.39.4

func (r *ExecutionResults) ByBlockID(blockID flow.Identifier) (*flow.ExecutionResult, error)

func (*ExecutionResults) ByID added in v0.39.4

func (r *ExecutionResults) ByID(resultID flow.Identifier) (*flow.ExecutionResult, error)

func (*ExecutionResults) ForceIndex added in v0.39.4

func (r *ExecutionResults) ForceIndex(blockID flow.Identifier, resultID flow.Identifier) error

func (*ExecutionResults) Index added in v0.39.4

func (r *ExecutionResults) Index(blockID flow.Identifier, resultID flow.Identifier) error

Index indexes an execution result by block ID. Note: this method call is not concurrent safe, because it checks if the different result is already indexed by the same blockID, and if it is, it returns an error. The caller needs to ensure that there is no concurrent call to this method with the same blockID.

func (*ExecutionResults) RemoveIndexByBlockID added in v0.39.4

func (r *ExecutionResults) RemoveIndexByBlockID(blockID flow.Identifier) error

func (*ExecutionResults) Store added in v0.39.4

func (r *ExecutionResults) Store(result *flow.ExecutionResult) error

type Guarantees added in v0.43.0

type Guarantees struct {
	// contains filtered or unexported fields
}

Guarantees implements persistent storage for collection guarantees.

func NewGuarantees added in v0.43.0

func NewGuarantees(collector module.CacheMetrics, db storage.DB, cacheSize uint) *Guarantees

func (*Guarantees) ByCollectionID added in v0.43.0

func (g *Guarantees) ByCollectionID(collID flow.Identifier) (*flow.CollectionGuarantee, error)

type Headers added in v0.43.0

type Headers struct {
	// contains filtered or unexported fields
}

Headers implements a simple read-only header storage around a DB.

func NewHeaders added in v0.43.0

func NewHeaders(collector module.CacheMetrics, db storage.DB) *Headers

func (*Headers) BlockIDByHeight added in v0.43.0

func (h *Headers) BlockIDByHeight(height uint64) (flow.Identifier, error)

BlockIDByHeight returns the block ID that is finalized at the given height. It is an optimized version of `ByHeight` that skips retrieving the block. Expected errors during normal operations:

  • `storage.ErrNotFound` if no finalized block is known at given height.

func (*Headers) ByBlockID added in v0.43.0

func (h *Headers) ByBlockID(blockID flow.Identifier) (*flow.Header, error)

func (*Headers) ByHeight added in v0.43.0

func (h *Headers) ByHeight(height uint64) (*flow.Header, error)

func (*Headers) ByParentID added in v0.43.0

func (h *Headers) ByParentID(parentID flow.Identifier) ([]*flow.Header, error)

func (*Headers) Exists added in v0.43.0

func (h *Headers) Exists(blockID flow.Identifier) (bool, error)

Exists returns true if a header with the given ID has been stored. No errors are expected during normal operation.

func (*Headers) FindHeaders added in v0.43.0

func (h *Headers) FindHeaders(filter func(header *flow.Header) bool) ([]flow.Header, error)

func (*Headers) RollbackExecutedBlock added in v0.43.0

func (h *Headers) RollbackExecutedBlock(header *flow.Header) error

RollbackExecutedBlock update the executed block header to the given header. only useful for execution node to roll back executed block height This method is not concurrent safe, the caller should make sure to call this method in a single thread.

type IdentifierAndUint32 added in v0.42.0

type IdentifierAndUint32 [flow.IdentifierLen + 4]byte

func KeyFromBlockIDIndex added in v0.39.4

func KeyFromBlockIDIndex(blockID flow.Identifier, txIndex uint32) IdentifierAndUint32

type Index added in v0.43.0

type Index struct {
	// contains filtered or unexported fields
}

Index implements a simple read-only payload storage around a badger DB.

func NewIndex added in v0.43.0

func NewIndex(collector module.CacheMetrics, db storage.DB) *Index

func (*Index) ByBlockID added in v0.43.0

func (i *Index) ByBlockID(blockID flow.Identifier) (*flow.Index, error)

type LatestPersistedSealedResult added in v0.43.0

type LatestPersistedSealedResult struct {
	// contains filtered or unexported fields
}

LatestPersistedSealedResult tracks the most recently persisted sealed execution result processed by the Access ingestion engine.

func NewLatestPersistedSealedResult added in v0.43.0

func NewLatestPersistedSealedResult(
	progress storage.ConsumerProgress,
	headers storage.Headers,
	results storage.ExecutionResults,
) (*LatestPersistedSealedResult, error)

NewLatestPersistedSealedResult creates a new LatestPersistedSealedResult instance.

No errors are expected during normal operation,

func (*LatestPersistedSealedResult) BatchSet added in v0.43.0

func (l *LatestPersistedSealedResult) BatchSet(resultID flow.Identifier, height uint64, batch storage.ReaderBatchWriter) error

BatchSet updates the latest persisted sealed result in a batch operation The resultID and height are added to the provided batch, and the local data is updated only after the batch is successfully committed.

No errors are expected during normal operation,

func (*LatestPersistedSealedResult) Latest added in v0.43.0

Latest returns the ID and height of the latest persisted sealed result.

type LightTransactionResults added in v0.39.4

type LightTransactionResults struct {
	// contains filtered or unexported fields
}

func NewLightTransactionResults added in v0.39.4

func NewLightTransactionResults(collector module.CacheMetrics, db storage.DB, transactionResultsCacheSize uint) *LightTransactionResults

func (*LightTransactionResults) BatchStore added in v0.39.4

func (tr *LightTransactionResults) BatchStore(blockID flow.Identifier, transactionResults []flow.LightTransactionResult, rw storage.ReaderBatchWriter) error

func (*LightTransactionResults) BatchStoreBadger added in v0.39.4

func (tr *LightTransactionResults) BatchStoreBadger(blockID flow.Identifier, transactionResults []flow.LightTransactionResult, batch storage.BatchStorage) error

func (*LightTransactionResults) ByBlockID added in v0.39.4

ByBlockID gets all transaction results for a block, ordered by transaction index

func (*LightTransactionResults) ByBlockIDTransactionID added in v0.39.4

func (tr *LightTransactionResults) ByBlockIDTransactionID(blockID flow.Identifier, txID flow.Identifier) (*flow.LightTransactionResult, error)

ByBlockIDTransactionID returns the transaction result for the given block ID and transaction ID

func (*LightTransactionResults) ByBlockIDTransactionIndex added in v0.39.4

func (tr *LightTransactionResults) ByBlockIDTransactionIndex(blockID flow.Identifier, txIndex uint32) (*flow.LightTransactionResult, error)

ByBlockIDTransactionIndex returns the transaction result for the given blockID and transaction index

type MyExecutionReceipts added in v0.39.4

type MyExecutionReceipts struct {
	// contains filtered or unexported fields
}

MyExecutionReceipts holds and indexes Execution Receipts. MyExecutionReceipts is implemented as a wrapper around badger.ExecutionReceipts The wrapper adds the ability to "MY execution receipt", from the viewpoint of an individual Execution Node.

func NewMyExecutionReceipts added in v0.39.4

func NewMyExecutionReceipts(collector module.CacheMetrics, db storage.DB, receipts storage.ExecutionReceipts) *MyExecutionReceipts

NewMyExecutionReceipts creates instance of MyExecutionReceipts which is a wrapper wrapper around badger.ExecutionReceipts It's useful for execution nodes to keep track of produced execution receipts.

func (*MyExecutionReceipts) BatchRemoveIndexByBlockID added in v0.39.4

func (m *MyExecutionReceipts) BatchRemoveIndexByBlockID(blockID flow.Identifier, rw storage.ReaderBatchWriter) error

BatchRemoveIndexByBlockID removes blockID-to-my-execution-receipt index entry keyed by a blockID in a provided batch No errors are expected during normal operation, even if no entries are matched. If Badger unexpectedly fails to process the request, the error is wrapped in a generic error and returned.

func (*MyExecutionReceipts) BatchStoreMyReceipt added in v0.39.4

func (m *MyExecutionReceipts) BatchStoreMyReceipt(receipt *flow.ExecutionReceipt, rw storage.ReaderBatchWriter) error

BatchStoreMyReceipt stores blockID-to-my-receipt index entry keyed by blockID in a provided batch. No errors are expected during normal operation If entity fails marshalling, the error is wrapped in a generic error and returned. If Badger unexpectedly fails to process the request, the error is wrapped in a generic error and returned. If a different my receipt has been indexed for the same block, the error is wrapped in a generic error and returned.

func (*MyExecutionReceipts) MyReceipt added in v0.39.4

func (m *MyExecutionReceipts) MyReceipt(blockID flow.Identifier) (*flow.ExecutionReceipt, error)

MyReceipt retrieves my receipt for the given block. Returns storage.ErrNotFound if no receipt was persisted for the block.

func (*MyExecutionReceipts) RemoveIndexByBlockID added in v0.39.4

func (m *MyExecutionReceipts) RemoveIndexByBlockID(blockID flow.Identifier) error

type NodeDisallowList added in v0.41.0

type NodeDisallowList struct {
	// contains filtered or unexported fields
}

func NewNodeDisallowList added in v0.41.0

func NewNodeDisallowList(db storage.DB) *NodeDisallowList

func (*NodeDisallowList) Retrieve added in v0.41.0

func (dl *NodeDisallowList) Retrieve(disallowList *map[flow.Identifier]struct{}) error

Retrieve reads the set of disallowed nodes from the database. No error is returned if no database entry exists. No errors are expected during normal operations.

func (*NodeDisallowList) Store added in v0.41.0

func (dl *NodeDisallowList) Store(disallowList map[flow.Identifier]struct{}) error

Store writes the given disallowList to the database. To avoid legacy entries in the database, we purge the entire database entry if disallowList is empty. No errors are expected during normal operations.

type Payloads added in v0.43.0

type Payloads struct {
	// contains filtered or unexported fields
}

func NewPayloads added in v0.43.0

func NewPayloads(db storage.DB, index *Index, guarantees *Guarantees, seals *Seals, receipts *ExecutionReceipts,
	results *ExecutionResults) *Payloads

func (*Payloads) ByBlockID added in v0.43.0

func (p *Payloads) ByBlockID(blockID flow.Identifier) (*flow.Payload, error)

type ProtocolKVStore added in v0.43.0

type ProtocolKVStore struct {
	// contains filtered or unexported fields
}

ProtocolKVStore implements persistent storage for storing KV store snapshots.

func NewProtocolKVStore added in v0.43.0

func NewProtocolKVStore(collector module.CacheMetrics,
	db storage.DB,
	kvStoreCacheSize uint,
	kvStoreByBlockIDCacheSize uint,
) *ProtocolKVStore

NewProtocolKVStore creates a ProtocolKVStore instance, which is a database holding KV store snapshots. It supports storing, caching and retrieving by ID or the additionally indexed block ID.

func (*ProtocolKVStore) BatchIndex added in v0.43.0

func (s *ProtocolKVStore) BatchIndex(rw storage.ReaderBatchWriter, blockID flow.Identifier, stateID flow.Identifier) error

BatchIndex returns an anonymous function intended to be executed as part of a database transaction. In a nutshell, we want to maintain a map from `blockID` to `stateID`, where `blockID` references the block that _proposes_ updated key-value store. Upon call, the anonymous function persists the specific map entry in the node's database. Protocol convention:

  • Consider block B, whose ingestion might potentially lead to an updated KV store. For example, the KV store changes if we seal some execution results emitting specific service events.
  • For the key `blockID`, we use the identity of block B which _proposes_ this updated KV store.
  • CAUTION: The updated state requires confirmation by a QC and will only become active at the child block, _after_ validating the QC.

Expected errors during normal operations:

  • storage.ErrDataMismatch if a KV store for the given blockID has already been indexed, but different

func (*ProtocolKVStore) BatchStore added in v0.43.0

BatchStore stores the protocol state key value data with the given stateID.into the database Expected errors during normal operations: - storage.ErrDataMismatch if a KV store for the given stateID has already been indexed, but different

func (*ProtocolKVStore) ByBlockID added in v0.43.0

func (s *ProtocolKVStore) ByBlockID(blockID flow.Identifier) (*flow.PSKeyValueStoreData, error)

ByBlockID retrieves the kv-store snapshot that the block with the given ID proposes. CAUTION: this store snapshot requires confirmation by a QC and will only become active at the child block, _after_ validating the QC. Protocol convention:

  • Consider block B, whose ingestion might potentially lead to an updated KV store state. For example, the state changes if we seal some execution results emitting specific service events.
  • For the key `blockID`, we use the identity of block B which _proposes_ this updated KV store. As value, the hash of the resulting state at the end of processing B is to be used.
  • CAUTION: The updated state requires confirmation by a QC and will only become active at the child block, _after_ validating the QC.

Expected errors during normal operations:

  • storage.ErrNotFound if no snapshot has been indexed for the given block.

func (*ProtocolKVStore) ByID added in v0.43.0

ByID retrieves the KV store snapshot with the given state ID. Expected errors during normal operations:

  • storage.ErrNotFound if no snapshot with the given Identifier is known.

type QuorumCertificates added in v0.43.0

type QuorumCertificates struct {
	// contains filtered or unexported fields
}

QuorumCertificates implements persistent storage for quorum certificates.

func NewQuorumCertificates added in v0.43.0

func NewQuorumCertificates(collector module.CacheMetrics, db storage.DB, cacheSize uint) *QuorumCertificates

NewQuorumCertificates Creates QuorumCertificates instance which is a database of quorum certificates which supports storing, caching and retrieving by block ID.

func (*QuorumCertificates) BatchStore added in v0.43.0

BatchStore stores a Quorum Certificate as part of database batch update. QC is indexed by QC.BlockID. * storage.ErrAlreadyExists if a different QC for blockID is already stored

func (*QuorumCertificates) ByBlockID added in v0.43.0

func (q *QuorumCertificates) ByBlockID(blockID flow.Identifier) (*flow.QuorumCertificate, error)

func (*QuorumCertificates) StoreTx

type ResultApprovals

type ResultApprovals struct {
	// contains filtered or unexported fields
}

ResultApprovals implements persistent storage for result approvals.

CAUTION suitable only for _Verification Nodes_ for persisting their _own_ approvals!

  • In general, the Flow protocol requires multiple approvals for the same chunk from different verification nodes. In other words, there are multiple different approvals for the same chunk.
  • Internally, ResultApprovals populates an index from Executed Chunk ➜ ResultApproval. This is *only safe* for Verification Nodes when tracking their own approvals (for the same ExecutionResult, a Verifier will always produce the same approval)

func NewResultApprovals

func NewResultApprovals(collector module.CacheMetrics, db storage.DB, lockManager lockctx.Manager) *ResultApprovals

func (*ResultApprovals) ByChunk

func (r *ResultApprovals) ByChunk(resultID flow.Identifier, chunkIndex uint64) (*flow.ResultApproval, error)

ByChunk retrieves a ResultApproval by result ID and chunk index. The ResultApprovals store is only used within a verification node, where it is assumed that there is never more than one approval per chunk.

func (*ResultApprovals) ByID

func (r *ResultApprovals) ByID(approvalID flow.Identifier) (*flow.ResultApproval, error)

ByID retrieves a ResultApproval by its ID

func (*ResultApprovals) StoreMyApproval added in v0.43.0

func (r *ResultApprovals) StoreMyApproval(lctx lockctx.Proof, approval *flow.ResultApproval) error

StoreMyApproval stores my own ResultApproval No errors are expected during normal operations. it also indexes a ResultApproval by result ID and chunk index.

CAUTION: the Flow protocol requires multiple approvals for the same chunk from different verification nodes. In other words, there are multiple different approvals for the same chunk. Therefore, the index Executed Chunk ➜ ResultApproval ID (populated here) is *only safe* to be used by Verification Nodes for tracking their own approvals.

For the same ExecutionResult, a Verifier will always produce the same approval. Therefore, this operation is idempotent, i.e. repeated calls with the *same inputs* are equivalent to just calling the method once; still the method succeeds on each call. However, when attempting to index *different* ResultApproval IDs for the same key (resultID, chunkIndex) this method returns an exception, as this should never happen for a correct Verification Node indexing its own approvals.

type Seals added in v0.43.0

type Seals struct {
	// contains filtered or unexported fields
}

func NewSeals added in v0.43.0

func NewSeals(collector module.CacheMetrics, db storage.DB) *Seals

func (*Seals) ByID added in v0.43.0

func (s *Seals) ByID(sealID flow.Identifier) (*flow.Seal, error)

func (*Seals) FinalizedSealForBlock added in v0.43.0

func (s *Seals) FinalizedSealForBlock(blockID flow.Identifier) (*flow.Seal, error)

FinalizedSealForBlock returns the seal for the given block, only if that seal has been included in a finalized block. Returns storage.ErrNotFound if the block is unknown or unsealed.

func (*Seals) HighestInFork added in v0.43.0

func (s *Seals) HighestInFork(blockID flow.Identifier) (*flow.Seal, error)

HighestInFork retrieves the highest seal that was included in the fork up to (and including) blockID. This method should return a seal for any block known to the node. Returns storage.ErrNotFound if blockID is unknown.

func (*Seals) Store added in v0.43.0

func (s *Seals) Store(seal *flow.Seal) error

type ServiceEvents added in v0.39.4

type ServiceEvents struct {
	// contains filtered or unexported fields
}

func NewServiceEvents added in v0.39.4

func NewServiceEvents(collector module.CacheMetrics, db storage.DB) *ServiceEvents

func (*ServiceEvents) BatchRemoveByBlockID added in v0.39.4

func (e *ServiceEvents) BatchRemoveByBlockID(blockID flow.Identifier, rw storage.ReaderBatchWriter) error

BatchRemoveByBlockID removes service events keyed by a blockID in provided batch No errors are expected during normal operation, even if no entries are matched. If Badger unexpectedly fails to process the request, the error is wrapped in a generic error and returned.

func (*ServiceEvents) BatchStore added in v0.39.4

func (e *ServiceEvents) BatchStore(blockID flow.Identifier, events []flow.Event, rw storage.ReaderBatchWriter) error

BatchStore stores service events keyed by a blockID in provided batch No errors are expected during normal operation, even if no entries are matched. If Badger unexpectedly fails to process the request, the error is wrapped in a generic error and returned.

func (*ServiceEvents) ByBlockID added in v0.39.4

func (e *ServiceEvents) ByBlockID(blockID flow.Identifier) ([]flow.Event, error)

ByBlockID returns the events for the given block ID

func (*ServiceEvents) RemoveByBlockID added in v0.39.4

func (e *ServiceEvents) RemoveByBlockID(blockID flow.Identifier) error

RemoveByBlockID removes service events by block ID

type TransactionResultErrorMessages added in v0.39.4

type TransactionResultErrorMessages struct {
	// contains filtered or unexported fields
}

func NewTransactionResultErrorMessages added in v0.39.4

func NewTransactionResultErrorMessages(collector module.CacheMetrics, db storage.DB, transactionResultsCacheSize uint) *TransactionResultErrorMessages

func (*TransactionResultErrorMessages) BatchStore added in v0.43.0

func (t *TransactionResultErrorMessages) BatchStore(
	blockID flow.Identifier,
	transactionResultErrorMessages []flow.TransactionResultErrorMessage,
	batch storage.ReaderBatchWriter,
) error

BatchStore inserts a batch of transaction result error messages into a batch

No errors are expected during normal operation.

func (*TransactionResultErrorMessages) ByBlockID added in v0.39.4

ByBlockID gets all transaction result error messages for a block, ordered by transaction index. Note: This method will return an empty slice both if the block is not indexed yet and if the block does not have any errors.

No errors are expected during normal operations.

func (*TransactionResultErrorMessages) ByBlockIDTransactionID added in v0.39.4

func (t *TransactionResultErrorMessages) ByBlockIDTransactionID(blockID flow.Identifier, transactionID flow.Identifier) (*flow.TransactionResultErrorMessage, error)

ByBlockIDTransactionID returns the transaction result error message for the given block ID and transaction ID

Expected errors during normal operation:

  • `storage.ErrNotFound` if no transaction error message is known at given block and transaction id.

func (*TransactionResultErrorMessages) ByBlockIDTransactionIndex added in v0.39.4

func (t *TransactionResultErrorMessages) ByBlockIDTransactionIndex(blockID flow.Identifier, txIndex uint32) (*flow.TransactionResultErrorMessage, error)

ByBlockIDTransactionIndex returns the transaction result error message for the given blockID and transaction index

Expected errors during normal operation:

  • `storage.ErrNotFound` if no transaction error message is known at given block and transaction index.

func (*TransactionResultErrorMessages) Exists added in v0.39.4

Exists returns true if transaction result error messages for the given ID have been stored.

No errors are expected during normal operation.

func (*TransactionResultErrorMessages) Store added in v0.39.4

func (t *TransactionResultErrorMessages) Store(blockID flow.Identifier, transactionResultErrorMessages []flow.TransactionResultErrorMessage) error

Store will store transaction result error messages for the given block ID.

No errors are expected during normal operation.

type TransactionResults added in v0.39.4

type TransactionResults struct {
	// contains filtered or unexported fields
}

func NewTransactionResults added in v0.39.4

func NewTransactionResults(collector module.CacheMetrics, db storage.DB, transactionResultsCacheSize uint) *TransactionResults

func (*TransactionResults) BatchRemoveByBlockID added in v0.39.4

func (tr *TransactionResults) BatchRemoveByBlockID(blockID flow.Identifier, batch storage.ReaderBatchWriter) error

BatchRemoveByBlockID batch removes transaction results by block ID

func (*TransactionResults) BatchStore added in v0.39.4

func (tr *TransactionResults) BatchStore(blockID flow.Identifier, transactionResults []flow.TransactionResult, batch storage.ReaderBatchWriter) error

BatchStore will store the transaction results for the given block ID in a batch

func (*TransactionResults) ByBlockID added in v0.39.4

func (tr *TransactionResults) ByBlockID(blockID flow.Identifier) ([]flow.TransactionResult, error)

ByBlockID gets all transaction results for a block, ordered by transaction index

func (*TransactionResults) ByBlockIDTransactionID added in v0.39.4

func (tr *TransactionResults) ByBlockIDTransactionID(blockID flow.Identifier, txID flow.Identifier) (*flow.TransactionResult, error)

ByBlockIDTransactionID returns the runtime transaction result for the given block ID and transaction ID

func (*TransactionResults) ByBlockIDTransactionIndex added in v0.39.4

func (tr *TransactionResults) ByBlockIDTransactionIndex(blockID flow.Identifier, txIndex uint32) (*flow.TransactionResult, error)

ByBlockIDTransactionIndex returns the runtime transaction result for the given block ID and transaction index

func (*TransactionResults) RemoveByBlockID added in v0.39.4

func (tr *TransactionResults) RemoveByBlockID(blockID flow.Identifier) error

RemoveByBlockID removes transaction results by block ID

type Transactions added in v0.39.4

type Transactions struct {
	// contains filtered or unexported fields
}

Transactions ...

func NewTransactions added in v0.39.4

func NewTransactions(cacheMetrics module.CacheMetrics, db storage.DB) *Transactions

NewTransactions ...

func (*Transactions) BatchStore added in v0.43.0

func (t *Transactions) BatchStore(tx *flow.TransactionBody, batch storage.ReaderBatchWriter) error

BatchStore stores transaction within a batch operation. No errors are expected during normal operations

func (*Transactions) ByID added in v0.39.4

func (*Transactions) RemoveBatch added in v0.41.0

func (t *Transactions) RemoveBatch(rw storage.ReaderBatchWriter, txID flow.Identifier) error

RemoveBatch removes a transaction by fingerprint.

func (*Transactions) Store added in v0.39.4

func (t *Transactions) Store(flowTx *flow.TransactionBody) error

type TwoIdentifier added in v0.42.0

type TwoIdentifier [flow.IdentifierLen * 2]byte

func KeyFromBlockIDTransactionID added in v0.39.4

func KeyFromBlockIDTransactionID(blockID flow.Identifier, txID flow.Identifier) TwoIdentifier

type VersionBeacons added in v0.39.2

type VersionBeacons struct {
	// contains filtered or unexported fields
}

func NewVersionBeacons added in v0.39.2

func NewVersionBeacons(db storage.DB) *VersionBeacons

func (*VersionBeacons) Highest added in v0.39.2

func (r *VersionBeacons) Highest(
	belowOrEqualTo uint64,
) (*flow.SealedVersionBeacon, error)

Directories

Path Synopsis
inmemory

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL