Documentation
¶
Index ¶
- Constants
- type Config
- type CreateFunc
- type Engine
- func (e *Engine) EntityByID(entityID flow.Identifier, selector flow.IdentityFilter[flow.Identity])
- func (e *Engine) EntityBySecondaryKey(key flow.Identifier, selector flow.IdentityFilter[flow.Identity])
- func (e *Engine) Force()
- func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error
- func (e *Engine) WithHandle(handle HandleFunc)
- type HandleFunc
- type OptionFunc
- func WithBatchInterval(interval time.Duration) OptionFunc
- func WithBatchThreshold(threshold uint) OptionFunc
- func WithRetryAttempts(attempts uint) OptionFunc
- func WithRetryFunction(retry RetryFunc) OptionFunc
- func WithRetryInitial(interval time.Duration) OptionFunc
- func WithRetryMaximum(interval time.Duration) OptionFunc
- type Request
- type RetryFunc
Constants ¶
const DefaultEntityRequestCacheSize = 500
DefaultEntityRequestCacheSize is the default max message queue size for the requester engine. Assuming a maximum size 10MB per message, a full queue would consume ~5GB of memory (10M*500). While most messages (such as execution receipts) are significantly smaller than 10MB, some messages like chunk data packs can be significantly larger. The user should properly tune this parameter based on their use case and ensure enough memory is available.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
BatchInterval time.Duration // minimum interval between requests
BatchThreshold uint // maximum batch size for one request
RetryInitial time.Duration // interval after which we retry request for an entity
RetryFunction RetryFunc // function determining growth of retry interval
RetryMaximum time.Duration // maximum interval for retrying request for an entity
RetryAttempts uint // maximum amount of request attempts per entity
}
type CreateFunc ¶
CreateFunc is a function that creates a `flow.Entity` with an underlying type so that we can properly decode entities transmitted over the network.
type Engine ¶
type Engine struct {
*component.ComponentManager
// contains filtered or unexported fields
}
Engine is a generic requester engine, handling the requesting of entities on the flow network. It is the `request` part of the request-reply pattern provided by the pair of generic exchange engines. All exported methods are concurrency safe.
func New ¶
func New( log zerolog.Logger, metrics module.EngineMetrics, net network.EngineRegistry, me module.Local, state protocol.State, requestQueue engine.MessageStore, channel channels.Channel, selector flow.IdentityFilter[flow.Identity], create CreateFunc, options ...OptionFunc, ) (*Engine, error)
New creates a new requester engine, operating on the provided network channel, and requesting entities from a node within the set obtained by applying the provided selector filter. The options allow customization of the parameters related to the batch and retry logic.
IMPORTANT:
- The injected engine.MessageStore is used to queue incoming responses from potentially byzantine peers. The backing implementation must be fully BFT, including resilience against resource exhaustion attacks and targeted cache eviction attacks. Hero data structures are generally not suitable, as most of them are not BFT at the time of writing (see www.notion.so/flowfoundation/Intro-to-heap-friendly-hero-structures-d1e420752ce6470f857e848ad1e60213 ).
- Challenging, borderline overload scenarios should be anticipated. The injected engine.MessageStore must have bounded size and drop messages when full (instead of blocking). The requester engine will log warnings when messages are dropped.
No error returns are expected during normal operations.
func (*Engine) EntityByID ¶
func (e *Engine) EntityByID(entityID flow.Identifier, selector flow.IdentityFilter[flow.Identity])
EntityByID will enqueue the given entity for request by its ID (content hash). We permit request data only from non-ejected, staked nodes (excluding observer variants of roles and the requesting node itself). The selector will be applied to the resulting set of peers. This allows finer-grained control over which providers to request from on a per-entity basis. Use `filter.Any` if no additional restrictions are required. Received entities will be verified for integrity using their ID function. Idempotent w.r.t. `queryKey` (if prior request is still ongoing, we just continue trying). Concurrency safe.
func (*Engine) EntityBySecondaryKey ¶
func (e *Engine) EntityBySecondaryKey(key flow.Identifier, selector flow.IdentityFilter[flow.Identity])
EntityBySecondaryKey will enqueue the given entity for request by some secondary identifier (NOT its content hash). We permit request data only from non-ejected, staked nodes (excluding observer variants of roles and the requesting node itself). The selector will be applied to the resulting set of peers. This allows finer-grained control over which providers to request from on a per-entity basis. Use `filter.Any` if no additional restrictions are required. It is the CALLER's RESPONSIBILITY to verify integrity (and authenticity if applicable) of the received data which might be provided by a byzantine peer. Idempotent w.r.t. `queryKey` (if prior request is still ongoing, we just continue trying). Concurrency safe.
func (*Engine) Force ¶
func (e *Engine) Force()
Force will force the requester engine to dispatch all currently valid batch requests. This method does not block; requests are checked asynchronously. Repeated calls are no-ops as long as once forced request is ongoing.
func (*Engine) Process ¶
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error
Process queues the given message from the node with the given origin ID for asynchronous processing. If the injected `requestQueue` is full, the message is dropped and a warning is logged. For inputs of unexpected type, a warning is logged and the message is dropped.
No error returns are expected during normal operations.
func (*Engine) WithHandle ¶
func (e *Engine) WithHandle(handle HandleFunc)
WithHandle sets the entityConsumer function of the requester, which is how it processes returned entities. The engine can not be started without setting the entityConsumer function. It is done in a separate call so that the requester can be injected into engines upon construction, and then provide a entityConsumer function to the requester from that engine itself.
type HandleFunc ¶
type HandleFunc func(originID flow.Identifier, entity flow.Entity)
HandleFunc is a function provided to the requester engine to entityConsumer an entity once it has been retrieved from a provider. The function should be non-blocking and errors should be handled internally within the function.
type OptionFunc ¶
type OptionFunc func(*Config)
func WithBatchInterval ¶
func WithBatchInterval(interval time.Duration) OptionFunc
WithBatchInterval sets a custom interval at which we scan for pending items and batch them for requesting.
func WithBatchThreshold ¶
func WithBatchThreshold(threshold uint) OptionFunc
WithBatchThreshold sets a custom threshold for the maximum size of a batch. If we have the given amount of pending items, we immediately send a batch.
func WithRetryAttempts ¶
func WithRetryAttempts(attempts uint) OptionFunc
WithRetryAttempts sets the number of attempts we will make before we give up on retrying. Use zero for infinite retries.
func WithRetryFunction ¶
func WithRetryFunction(retry RetryFunc) OptionFunc
WithRetryFunction sets the function at which the retry interval increases.
func WithRetryInitial ¶
func WithRetryInitial(interval time.Duration) OptionFunc
WithRetryInitial sets the initial interval for dispatching a request for the second time.
func WithRetryMaximum ¶
func WithRetryMaximum(interval time.Duration) OptionFunc
WithRetryMaximum sets the maximum retry interval at which we will retry.
type Request ¶
type Request struct {
QueryKey flow.Identifier // the key used to identify the requested entity (content hash or secondary key)
NumAttempts uint // number of times the entity was requested
LastRequested time.Time // approximate timestamp of last request
RetryAfter time.Duration // interval until request should be retried
ExtraSelector flow.IdentityFilter[flow.Identity] // additional filters for providers of this entity
// contains filtered or unexported fields
}
type RetryFunc ¶
func RetryConstant ¶
func RetryConstant() RetryFunc