mempool

package
v0.43.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 22, 2025 License: AGPL-3.0 Imports: 8 Imported by: 16

README

The mempool module

The mempool module provides mempool implementations for the Flow blockchain, which are in-memory data structures that are tasked with storing the flow.Entity objects. flow.Entity objects are the fundamental data model of the Flow blockchain, and every Flow primitives such as transactions, blocks, and collections are represented as flow.Entity objects.

Each mempool implementation is tasked for storing a specific type of flow.Entity. As a convention, all mempools are built on top of the stdmap.Backend struct, which provides a thread-safe cache implementation for storing and retrieving flow.Entity objects. The primary responsibility of the stdmap.Backend struct is to provide thread-safety for its underlying data model (i.e., mempool.Backdata) that is tasked with maintaining the actual flow.Entity objects.

At the moment, the mempool module provides two implementations for the mempool.Backdata:

  • backdata.Backdata: a map implementation for storing flow.Entity objects using native Go maps.
  • herocache.Cache: a cache implementation for storing flow.Entity objects, which is a heap-optimized cache implementation that is aims on minimizing the memory footprint of the mempool on the heap and reducing the GC pressure.

Note-1: by design the mempool.Backdata interface is not thread-safe. Therefore, it is the responsibility of the stdmap.Backend struct to provide thread-safety for its underlying mempool.Backdata implementation.

Note-2: The herocache.Cache implementation is several orders of magnitude faster than the backdata.Backdata on high-throughput workloads. For the read or write-heavy workloads, the herocache.Cache implementation is recommended as the underlying mempool.Backdata implementation.

Documentation

Index

Constants

View Source
const DefaultChunkDataPackRequestQueueSize = 100_000

Variables

This section is empty.

Functions

func IsBelowPrunedThresholdError added in v0.29.0

func IsBelowPrunedThresholdError(err error) bool

IsBelowPrunedThresholdError returns whether the given error is an BelowPrunedThresholdError error

func IsUnknownExecutionResultError added in v0.18.3

func IsUnknownExecutionResultError(err error) bool

IsUnknownExecutionResultError returns whether the given error is an UnknownExecutionResultError error

func NewBelowPrunedThresholdErrorf added in v0.29.0

func NewBelowPrunedThresholdErrorf(msg string, args ...interface{}) error

func NewUnknownExecutionResultErrorf added in v0.18.3

func NewUnknownExecutionResultErrorf(msg string, args ...interface{}) error

Types

type Assignments

Assignments represents a concurrency-safe memory pool for chunk assignments.

type BackData added in v0.23.9

type BackData[K comparable, V any] interface {
	// Has checks if backdata already stores a value under the given key.
	Has(key K) bool

	// Add attempts to add the given value to the backdata, without overwriting existing data.
	// If a value is already stored under the input key, Add is a no-op and returns false.
	// If no value is stored under the input key, Add adds the value and returns true.
	Add(key K, value V) bool

	// Remove removes the value with the given key.
	// If the key-value pair exists, returns the value and true.
	// Otherwise, returns the zero value for type V and false.
	Remove(key K) (V, bool)

	// Get returns the value for the given key.
	// Returns true if the key-value pair exists, and false otherwise.
	Get(key K) (V, bool)

	// Size returns the number of stored key-value pairs.
	Size() uint

	// All returns all stored key-value pairs as a map.
	All() map[K]V

	// Keys returns an unordered list of keys stored in the backdata.
	Keys() []K

	// Values returns an unordered list of values stored in the backdata.
	Values() []V

	// Clear removes all key-value pairs from the backdata.
	Clear()
}

BackData represents the underlying immutable generic key-value data structure used by mempool.Backend as the core structure of maintaining data on memory pools.

This interface provides fundamental operations for storing, retrieving, and removing data structures, but it does not support mutating already stored data. If modifications to the stored data is required, use MutableBackData instead.

NOTE: BackData by default is not expected to provide concurrency-safe operations. As it is just the model layer of the mempool, the safety against concurrent operations are guaranteed by the Backend that is the control layer.

type BelowPrunedThresholdError added in v0.29.0

type BelowPrunedThresholdError struct {
	// contains filtered or unexported fields
}

BelowPrunedThresholdError indicates that we are attempting to query or prune a mempool by a key (typically block height or block view) which is lower than the lowest retained key threshold. In other words, we have already pruned above the specified key value.

func (BelowPrunedThresholdError) Error added in v0.29.0

func (BelowPrunedThresholdError) Unwrap added in v0.29.0

func (e BelowPrunedThresholdError) Unwrap() error

type BlockFilter added in v0.14.0

type BlockFilter func(header *flow.Header) bool

BlockFilter is used for controlling the ExecutionTree's Execution Tree search. The search only traverses to results for blocks which pass the filter. If an the block for an execution result does not pass the filter, the entire sub-tree of derived results is not traversed.

type ChunkDataPackRequest added in v0.28.0

type ChunkDataPackRequest struct {
	// Identifier of the chunk.
	ChunkId flow.Identifier
	// Identifier of the requester node.
	RequesterId flow.Identifier
}

ChunkDataPackRequest is an internal data type for Execution Nodes that represents a request for a chunk data pack.

type ChunkRequestHistoryUpdaterFunc added in v0.17.0

type ChunkRequestHistoryUpdaterFunc func(uint64, time.Duration) (uint64, time.Duration, bool)

ChunkRequestHistoryUpdaterFunc is a function type that used by ChunkRequests mempool to perform atomic and isolated updates on the underlying chunk requests history.

func ExponentialUpdater added in v0.17.0

func ExponentialUpdater(multiplier float64, maxInterval time.Duration, minInterval time.Duration) ChunkRequestHistoryUpdaterFunc

ExponentialUpdater is a chunk request history updater factory that updates the retryAfter value of a request to multiplier * retryAfter. For example, if multiplier = 2, then invoking it n times results in a retryAfter value of 2^n * retryAfter, which follows an exponential series.

It also keeps updated retryAfter value between the minInterval and maxInterval inclusive. It means that if updated retryAfter value is below minInterval, it is bumped up to the minInterval. Also, if updated retryAfter value is above maxInterval, it is skimmed off back to the maxInterval.

Note: if initial retryAfter is below minInterval, the first call to this function returns minInterval, and hence after the nth invocations, the retryAfter value is set to 2^(n-1) * minInterval.

func IncrementalAttemptUpdater added in v0.17.0

func IncrementalAttemptUpdater() ChunkRequestHistoryUpdaterFunc

IncrementalAttemptUpdater is a chunk request history updater factory that increments the attempt field of request status and makes it instantly available against any retryAfter qualifier.

type ChunkRequests added in v0.17.0

type ChunkRequests interface {
	// RequestHistory returns the number of times the chunk has been requested,
	// last time the chunk has been requested, and the retryAfter duration of the
	// underlying request status of this chunk.
	//
	// The last boolean parameter returns whether a chunk request for this chunk ID
	// exists in memory-pool.
	RequestHistory(chunkID flow.Identifier) (uint64, time.Time, time.Duration, bool)

	// Add provides insertion functionality into the memory pool.
	// The insertion is only successful if there is no duplicate chunk request with the same
	// chunk ID in the memory. Otherwise, it aborts the insertion and returns false.
	Add(request *verification.ChunkDataPackRequest) bool

	// Remove provides deletion functionality from the memory pool.
	// If there is a chunk request with this ID, Remove removes it and returns true.
	// Otherwise, it returns false.
	Remove(chunkID flow.Identifier) bool

	// PopAll atomically returns all locators associated with this chunk ID while clearing out the
	// chunk request status for this chunk id.
	// Boolean return value indicates whether there are requests in the memory pool associated
	// with chunk ID.
	PopAll(chunkID flow.Identifier) (chunks.LocatorMap, bool)

	// IncrementAttempt increments the Attempt field of the corresponding status of the
	// chunk request in memory pool that has the specified chunk ID.
	// If such chunk ID does not exist in the memory pool, it returns false.
	//
	// The increments are done atomically, thread-safe, and in isolation.
	IncrementAttempt(chunkID flow.Identifier) bool

	// UpdateRequestHistory updates the request history of the specified chunk ID. If the update was successful, i.e.,
	// the updater returns true, the result of update is committed to the mempool, and the time stamp of the chunk request
	// is updated to the current time. Otherwise, it aborts and returns false.
	//
	// It returns the updated request history values.
	//
	// The updates under this method are atomic, thread-safe, and done in isolation.
	UpdateRequestHistory(chunkID flow.Identifier, updater ChunkRequestHistoryUpdaterFunc) (uint64, time.Time, time.Duration, bool)

	// All returns all chunk requests stored in this memory pool.
	All() verification.ChunkDataPackRequestInfoList

	// Size returns total number of chunk requests in the memory pool.
	Size() uint
}

ChunkRequests is an in-memory storage for maintaining chunk data pack requests.

type ChunkStatuses added in v0.17.0

ChunkStatuses is an in-memory storage for maintaining the chunk status data objects.

type DNSCache added in v0.23.9

type DNSCache interface {
	// PutIpDomain adds the given ip domain into cache.
	// The int64 argument is the timestamp associated with the domain.
	PutIpDomain(string, []net.IPAddr, int64) bool

	// PutTxtRecord adds the given txt record into the cache.
	// The int64 argument is the timestamp associated with the domain.
	PutTxtRecord(string, []string, int64) bool

	// GetDomainIp returns the ip domain if exists in the cache.
	// The boolean return value determines if domain exists in the cache.
	GetDomainIp(string) (*IpRecord, bool)

	// GetTxtRecord returns the txt record if exists in the cache.
	// The boolean return value determines if record exists in the cache.
	GetTxtRecord(string) (*TxtRecord, bool)

	// LockIPDomain locks an ip address dns record if exists in the cache.
	// The boolean return value determines whether attempt on locking was successful.
	//
	// A locking attempt is successful when the domain record exists in the cache and has not
	// been locked before.
	// Once a domain record gets locked the only way to unlock it is through updating that record.
	//
	// The locking process is defined to record that a resolving attempt is ongoing for an expired domain.
	// So the locking happens to avoid any other parallel resolving.
	LockIPDomain(string) (bool, error)

	// LockTxtRecord locks a txt address dns record if exists in the cache.
	// The boolean return value determines whether attempt on locking was successful.
	//
	// A locking attempt is successful when the domain record exists in the cache and has not
	// been locked before.
	// Once a domain record gets locked the only way to unlock it is through updating that record.
	//
	// The locking process is defined to record that a resolving attempt is ongoing for an expired domain.
	// So the locking happens to avoid any other parallel resolving.
	LockTxtRecord(string) (bool, error)

	// RemoveIp removes an ip domain from cache.
	RemoveIp(string) bool

	// RemoveTxt removes a txt record from cache.
	RemoveTxt(string) bool

	// UpdateTxtRecord updates the dns record for the given txt domain with the new address and timestamp values.
	UpdateTxtRecord(string, []string, int64) error

	// UpdateIPDomain updates the dns record for the given ip domain with the new address and timestamp values.
	UpdateIPDomain(string, []net.IPAddr, int64) error

	// Size returns total domains maintained into this cache.
	// The first returned value determines number of ip domains.
	// The second returned value determines number of txt records.
	Size() (uint, uint)
}

DNSCache provides an in-memory cache for storing dns entries.

type ExecutionData added in v0.31.0

ExecutionData represents a concurrency-safe memory pool for BlockExecutionData.

type ExecutionTree added in v0.14.0

type ExecutionTree interface {

	// AddResult adds an Execution Result to the Execution Tree (without any receipts), in
	// case the result is not already stored in the tree.
	// This is useful for crash recovery:
	// After recovering from a crash, the mempools are wiped and the sealed results will not
	// be stored in the Execution Tree anymore. Adding the result to the tree allows to create
	// a vertex in the tree without attaching any Execution Receipts to it.
	AddResult(result *flow.ExecutionResult, block *flow.Header) error

	// AddReceipt adds the given execution receipt to the memory pool. Requires height
	// of the block the receipt is for. We enforce data consistency on an API
	// level by using the block header as input.
	AddReceipt(receipt *flow.ExecutionReceipt, block *flow.Header) (bool, error)

	// HasReceipt returns true if the given receipt is already present in the mempool.
	HasReceipt(receipt *flow.ExecutionReceipt) bool

	// ReachableReceipts returns a slice of ExecutionReceipt, whose result
	// is computationally reachable from resultID. Context:
	// * Conceptually, the Execution results form a tree, which we refer to as
	//   Execution Tree. A branch in the execution can be due to a fork in the main
	//   chain. Furthermore, the execution branches if ENs disagree about the result
	//   for the same block.
	// * As the ID of an execution result contains the BlockID, which the result
	//   for, all Execution Results with the same ID necessarily are for the same
	//   block. All Execution Receipts committing to the same result from an
	//   equivalence class and can be represented as one vertex in the Execution
	//   Tree.
	// * An execution result r1 points (field ExecutionResult.ParentResultID) to
	//   its parent result r0 , whose end state was used as the starting state
	//   to compute r1. Formally, we have an edge r0 -> r1 in the Execution Tree,
	//   if a result r1 is stored in the mempool, whose ParentResultID points to
	//   r0.
	// ReachableReceipts implements a tree search on the Execution Tree starting
	// from the provided resultID. Execution Receipts are traversed in a
	// parent-first manner, meaning that a the parent result is traversed
	// _before_ any of its derived results. The algorithm only traverses to
	// results, for which there exists a sequence of interim result in the
	// mempool without any gaps.
	//
	// Two filters are supplied:
	// * blockFilter: the tree search will only travers to results for
	//   blocks which pass the filter. Often higher-level logic is only
	//   interested in results for blocks in a specific fork. Such can be
	//   implemented by a suitable blockFilter.
	// * receiptFilter: for a reachable result (subject to the restrictions
	//   imposed by blockFilter, all known receipts are returned.
	//   While _all_ Receipts for the parent result are guaranteed to be
	//   listed before the receipts for the derived results, there is no
	//   specific ordering for the receipts committing to the same result
	//   (random order). If only a subset of receipts for a result is desired
	//   (e.g. for de-duplication with parent blocks), receiptFilter should
	//   be used.
	// Note the important difference between the two filters:
	// * The blockFilter suppresses traversal to derived results.
	// * The receiptFilter does _not_ suppresses traversal to derived results.
	//   Only individual receipts are dropped.
	//
	// Error returns:
	// * UnknownExecutionResultError (sentinel) if resultID is unknown
	// * all other error are unexpected and potential indicators of corrupted internal state
	ReachableReceipts(resultID flow.Identifier, blockFilter BlockFilter, receiptFilter ReceiptFilter) ([]*flow.ExecutionReceipt, error)

	// Size returns the number of receipts stored in the mempool
	Size() uint

	// PruneUpToHeight prunes all results for all blocks with height up to but
	// NOT INCLUDING `newLowestHeight`. Errors if newLowestHeight is smaller than
	// the previous value (as we cannot recover previously pruned results).
	PruneUpToHeight(newLowestHeight uint64) error

	// LowestHeight returns the lowest height, where results are still
	// stored in the mempool.
	LowestHeight() uint64
}

ExecutionTree represents a concurrency-safe memory pool for execution Receipts. Its is aware of the tree structure formed by execution results. All execution receipts for the _same result_ form an equivalence class and are represented by _one_ vertex in the execution tree. The mempool utilizes knowledge about the height of the block the result is for. Hence, the Mempool can only store and process Receipts whose block is known.

Implementations are concurrency safe.

type Guarantees

Guarantees represents a concurrency-safe memory pool for collection guarantees.

type IdentifierMap

type IdentifierMap interface {
	// Append will append the id to the list of identifiers associated with key.
	Append(key, id flow.Identifier)

	// Remove removes the given key with all associated identifiers.
	Remove(key flow.Identifier) bool

	// RemoveIdFromKey removes the id from the list of identifiers associated with key.
	// If the list becomes empty, it also removes the key from the map.
	RemoveIdFromKey(key, id flow.Identifier) error

	// Get returns list of all identifiers associated with key and true, if the key exists in the mempool.
	// Otherwise it returns nil and false.
	Get(key flow.Identifier) (flow.IdentifierList, bool)

	// Has returns true if the key exists in the map, i.e., there is at least an id
	// attached to it.
	Has(key flow.Identifier) bool

	// Keys returns a list of all keys in the mempool.
	Keys() (flow.IdentifierList, bool)

	// Size returns the number of items in the mempool.
	Size() uint
}

IdentifierMap represents a concurrency-safe memory pool for sets of Identifier (keyed by some Identifier).

type IncorporatedResultSeals added in v0.11.0

type IncorporatedResultSeals interface {
	// Add adds an IncorporatedResultSeal to the mempool.
	Add(irSeal *flow.IncorporatedResultSeal) (bool, error)

	// All returns all the IncorporatedResultSeals in the mempool.
	All() []*flow.IncorporatedResultSeal

	// Get returns an IncorporatedResultSeal by IncorporatedResult ID.
	Get(flow.Identifier) (*flow.IncorporatedResultSeal, bool)

	// Limit returns the size limit of the mempool.
	Limit() uint

	// Remove removes an IncorporatedResultSeal from the mempool.
	Remove(incorporatedResultID flow.Identifier) bool

	// Size returns the number of items in the mempool.
	Size() uint

	// Clear removes all entities from the pool.
	Clear()

	// PruneUpToHeight remove all seals for blocks whose height is strictly
	// smaller that height. Note: seals for blocks at height are retained.
	// After pruning, seals below for blocks below the given height are dropped.
	//
	// Monotonicity Requirement:
	// The pruned height cannot decrease, as we cannot recover already pruned elements.
	// If `height` is smaller than the previous value, the previous value is kept
	// and the sentinel mempool.BelowPrunedThresholdError is returned.
	PruneUpToHeight(height uint64) error
}

IncorporatedResultSeals represents a concurrency safe memory pool for incorporated result seals.

type IpRecord added in v0.27.0

type IpRecord struct {
	Domain    string
	Addresses []net.IPAddr
	Timestamp int64

	// An IpRecord is locked when it is expired and a resolving is ongoing for it.
	// A locked IpRecord is unlocked by updating it through the cache.
	Locked bool
}

IpRecord represents the data model for maintaining an ip dns record in cache.

type Mempool added in v0.43.0

type Mempool[K comparable, V any] interface {
	// Has checks if a value is stored under the given key.
	Has(K) bool
	// Get returns the value for the given key.
	// Returns true if the key-value pair exists, and false otherwise.
	Get(K) (V, bool)
	// Add attempts to add the given value, without overwriting existing data.
	// If a value is already stored under the input key, Add is a no-op and returns false.
	// If no value is stored under the input key, Add adds the value and returns true.
	Add(K, V) bool
	// Remove removes the value with the given key.
	// If the key-value pair exists, returns the value and true.
	// Otherwise, returns the zero value for type V and false.
	Remove(K) bool
	// Adjust will adjust the value item using the given function if the given key can be found.
	// Returns:
	//   - value, true if the value with the given key was found. The returned value is the version after the update is applied.
	//   - nil, false if no value with the given key was found
	Adjust(key K, f func(V) V) (V, bool)
	// Size will return the size of the mempool.
	Size() uint
	// Values returns all stored values from the mempool.
	Values() []V
	// All returns all stored key-value pairs as a map from the mempool.
	All() map[K]V
	// Clear removes all key-value pairs from the mempool.
	Clear()
}

Mempool is a generic interface for concurrency-safe memory pool.

type MutableBackData added in v0.43.0

type MutableBackData[K comparable, V any] interface {
	BackData[K, V]

	// Adjust adjusts the value using the given function if the given key can be found.
	// Returns:
	//    - value, true if the value with the given key was found. The returned value is the version after the update is applied.
	//    - nil, false if no value with the given key was found
	Adjust(key K, f func(value V) V) (V, bool)

	// AdjustWithInit adjusts the value using the given function if the given key can be found. When the
	// value is not found, it initializes the value using the given init function and then applies the adjust function.
	// Args:
	// - key: the identifier of the value to adjust.
	// - adjust: the function that adjusts the value.
	// - init: the function that initializes the value when it is not found.
	// Returns:
	//   - the adjusted value.
	//   - a bool which indicates whether the value was either added or adjusted.
	AdjustWithInit(key K, adjust func(value V) V, init func() V) (V, bool)
}

MutableBackData extends BackData by allowing modifications to stored data structures. Unlike BackData, this interface supports adjusting existing data structures, making it suitable for use cases where they do not have a cryptographic hash function.

WARNING: Entities that are cryptographically protected, such as Entity objects tied to signatures or hashes, should not be modified. Use BackData instead to prevent unintended mutations.

type OnEjection added in v0.13.0

type OnEjection[V any] func(V)

OnEjection is a callback which a mempool executes on ejecting one of its elements. The callbacks are executed from within the thread that serves the mempool. Implementations should be non-blocking.

type PendingReceipts added in v0.14.1

type PendingReceipts interface {
	// Add a pending receipt
	// return true if added
	// return false if is a duplication
	Add(receipt *flow.ExecutionReceipt) bool

	// Remove a pending receipt by ID
	Remove(receiptID flow.Identifier) bool

	// ByPreviousResultID returns all the pending receipts whose previous result id
	// matches the given result id
	ByPreviousResultID(previousResultID flow.Identifier) []*flow.ExecutionReceipt

	// PruneUpToHeight remove all receipts for blocks whose height is strictly
	// smaller that height. Note: receipts for blocks at height are retained.
	// After pruning, receipts below for blocks below the given height are dropped.
	//
	// Monotonicity Requirement:
	// The pruned height cannot decrease, as we cannot recover already pruned elements.
	// If `height` is smaller than the previous value, the previous value is kept
	// and the sentinel mempool.BelowPrunedThresholdError is returned.
	PruneUpToHeight(height uint64) error
}

PendingReceipts stores pending receipts indexed by the id. It also maintains a secondary index on the previous result id, which is unique, in order to allow to find a receipt by the previous result id.

type ReceiptFilter added in v0.14.0

type ReceiptFilter func(receipt *flow.ExecutionReceipt) bool

ReceiptFilter is used to drop specific receipts from. It does NOT affect the ExecutionTree's Execution Tree search.

type TransactionTimings

type TransactionTimings Mempool[flow.Identifier, *flow.TransactionTiming]

TransactionTimings represents a concurrency-safe memory pool for transaction timings.

type Transactions

type Transactions interface {
	Mempool[flow.Identifier, *flow.TransactionBody]
	// ByPayer retrieves all transactions from the memory pool that are sent
	// by the given payer.
	ByPayer(payer flow.Address) []*flow.TransactionBody
}

Transactions represents a concurrency-safe memory pool for transactions.

type TxtRecord added in v0.27.0

type TxtRecord struct {
	Txt       string
	Records   []string
	Timestamp int64

	// A TxtRecord is locked when it is expired and a resolving is ongoing for it.
	// A locked TxtRecord is unlocked by updating it through the cache.
	Locked bool
}

TxtRecord represents the data model for maintaining a txt dns record in cache.

type UnknownExecutionResultError added in v0.18.3

type UnknownExecutionResultError struct {
	// contains filtered or unexported fields
}

UnknownExecutionResultError indicates that the Execution Result is unknown

func (UnknownExecutionResultError) Error added in v0.18.3

func (UnknownExecutionResultError) Unwrap added in v0.18.3

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL