requester

package
v0.45.0-internal-rc.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2026 License: AGPL-3.0 Imports: 20 Imported by: 6

Documentation

Index

Constants

View Source
const DefaultEntityRequestCacheSize = 500

DefaultEntityRequestCacheSize is the default max message queue size for the requester engine. Assuming a maximum size 10MB per message, a full queue would consume ~5GB of memory (10M*500). While most messages (such as execution receipts) are significantly smaller than 10MB, some messages like chunk data packs can be significantly larger. The user should properly tune this parameter based on their use case and ensure enough memory is available.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	BatchInterval  time.Duration // minimum interval between requests
	BatchThreshold uint          // maximum batch size for one request
	RetryInitial   time.Duration // interval after which we retry request for an entity
	RetryFunction  RetryFunc     // function determining growth of retry interval
	RetryMaximum   time.Duration // maximum interval for retrying request for an entity
	RetryAttempts  uint          // maximum amount of request attempts per entity
}

type CreateFunc

type CreateFunc func() flow.Entity

CreateFunc is a function that creates a `flow.Entity` with an underlying type so that we can properly decode entities transmitted over the network.

type Engine

type Engine struct {
	*component.ComponentManager
	// contains filtered or unexported fields
}

Engine is a generic requester engine, handling the requesting of entities on the flow network. It is the `request` part of the request-reply pattern provided by the pair of generic exchange engines. All exported methods are concurrency safe.

func New

func New(
	log zerolog.Logger,
	metrics module.EngineMetrics,
	net network.EngineRegistry,
	me module.Local,
	state protocol.State,
	requestQueue engine.MessageStore,
	channel channels.Channel,
	selector flow.IdentityFilter[flow.Identity],
	create CreateFunc,
	options ...OptionFunc,
) (*Engine, error)

New creates a new requester engine, operating on the provided network channel, and requesting entities from a node within the set obtained by applying the provided selector filter. The options allow customization of the parameters related to the batch and retry logic.

IMPORTANT:

  • The injected engine.MessageStore is used to queue incoming responses from potentially byzantine peers. The backing implementation must be fully BFT, including resilience against resource exhaustion attacks and targeted cache eviction attacks. Hero data structures are generally not suitable, as most of them are not BFT at the time of writing (see www.notion.so/flowfoundation/Intro-to-heap-friendly-hero-structures-d1e420752ce6470f857e848ad1e60213 ).
  • Challenging, borderline overload scenarios should be anticipated. The injected engine.MessageStore must have bounded size and drop messages when full (instead of blocking). The requester engine will log warnings when messages are dropped.

No error returns are expected during normal operations.

func (*Engine) EntityByID

func (e *Engine) EntityByID(entityID flow.Identifier, selector flow.IdentityFilter[flow.Identity])

EntityByID will enqueue the given entity for request by its ID (content hash). We permit request data only from non-ejected, staked nodes (excluding observer variants of roles and the requesting node itself). The selector will be applied to the resulting set of peers. This allows finer-grained control over which providers to request from on a per-entity basis. Use `filter.Any` if no additional restrictions are required. Received entities will be verified for integrity using their ID function. Idempotent w.r.t. `queryKey` (if prior request is still ongoing, we just continue trying). Concurrency safe.

func (*Engine) EntityBySecondaryKey

func (e *Engine) EntityBySecondaryKey(key flow.Identifier, selector flow.IdentityFilter[flow.Identity])

EntityBySecondaryKey will enqueue the given entity for request by some secondary identifier (NOT its content hash). We permit request data only from non-ejected, staked nodes (excluding observer variants of roles and the requesting node itself). The selector will be applied to the resulting set of peers. This allows finer-grained control over which providers to request from on a per-entity basis. Use `filter.Any` if no additional restrictions are required. It is the CALLER's RESPONSIBILITY to verify integrity (and authenticity if applicable) of the received data which might be provided by a byzantine peer. Idempotent w.r.t. `queryKey` (if prior request is still ongoing, we just continue trying). Concurrency safe.

func (*Engine) Force

func (e *Engine) Force()

Force will force the requester engine to dispatch all currently valid batch requests. This method does not block; requests are checked asynchronously. Repeated calls are no-ops as long as once forced request is ongoing.

func (*Engine) Process

func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error

Process queues the given message from the node with the given origin ID for asynchronous processing. If the injected `requestQueue` is full, the message is dropped and a warning is logged. For inputs of unexpected type, a warning is logged and the message is dropped.

No error returns are expected during normal operations.

func (*Engine) WithHandle

func (e *Engine) WithHandle(handle HandleFunc)

WithHandle sets the entityConsumer function of the requester, which is how it processes returned entities. The engine can not be started without setting the entityConsumer function. It is done in a separate call so that the requester can be injected into engines upon construction, and then provide a entityConsumer function to the requester from that engine itself.

type HandleFunc

type HandleFunc func(originID flow.Identifier, entity flow.Entity)

HandleFunc is a function provided to the requester engine to entityConsumer an entity once it has been retrieved from a provider. The function should be non-blocking and errors should be handled internally within the function.

type OptionFunc

type OptionFunc func(*Config)

func WithBatchInterval

func WithBatchInterval(interval time.Duration) OptionFunc

WithBatchInterval sets a custom interval at which we scan for pending items and batch them for requesting.

func WithBatchThreshold

func WithBatchThreshold(threshold uint) OptionFunc

WithBatchThreshold sets a custom threshold for the maximum size of a batch. If we have the given amount of pending items, we immediately send a batch.

func WithRetryAttempts

func WithRetryAttempts(attempts uint) OptionFunc

WithRetryAttempts sets the number of attempts we will make before we give up on retrying. Use zero for infinite retries.

func WithRetryFunction

func WithRetryFunction(retry RetryFunc) OptionFunc

WithRetryFunction sets the function at which the retry interval increases.

func WithRetryInitial

func WithRetryInitial(interval time.Duration) OptionFunc

WithRetryInitial sets the initial interval for dispatching a request for the second time.

func WithRetryMaximum

func WithRetryMaximum(interval time.Duration) OptionFunc

WithRetryMaximum sets the maximum retry interval at which we will retry.

type Request

type Request struct {
	QueryKey      flow.Identifier                    // the key used to identify the requested entity (content hash or secondary key)
	NumAttempts   uint                               // number of times the entity was requested
	LastRequested time.Time                          // approximate timestamp of last request
	RetryAfter    time.Duration                      // interval until request should be retried
	ExtraSelector flow.IdentityFilter[flow.Identity] // additional filters for providers of this entity
	// contains filtered or unexported fields
}

type RetryFunc

type RetryFunc func(time.Duration) time.Duration

func RetryConstant

func RetryConstant() RetryFunc

func RetryExponential

func RetryExponential(exponent float64) RetryFunc

func RetryGeometric

func RetryGeometric(factor float64) RetryFunc

func RetryLinear

func RetryLinear(increase time.Duration) RetryFunc

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL