requester

package
v0.43.0-rc.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2025 License: AGPL-3.0 Imports: 17 Imported by: 6

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	BatchInterval   time.Duration // minimum interval between requests
	BatchThreshold  uint          // maximum batch size for one request
	RetryInitial    time.Duration // interval after which we retry request for an entity
	RetryFunction   RetryFunc     // function determining growth of retry interval
	RetryMaximum    time.Duration // maximum interval for retrying request for an entity
	RetryAttempts   uint          // maximum amount of request attempts per entity
	ValidateStaking bool          // should staking of target/origin be checked
}

type CreateFunc

type CreateFunc func() flow.Entity

CreateFunc is a function that creates a `flow.Entity` with an underlying type so that we can properly decode entities transmitted over the network.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is a generic requester engine, handling the requesting of entities on the flow network. It is the `request` part of the request-reply pattern provided by the pair of generic exchange engines.

func New

func New(log zerolog.Logger, metrics module.EngineMetrics, net network.EngineRegistry, me module.Local, state protocol.State,
	channel channels.Channel, selector flow.IdentityFilter[flow.Identity], create CreateFunc, options ...OptionFunc) (*Engine, error)

New creates a new requester engine, operating on the provided network channel, and requesting entities from a node within the set obtained by applying the provided selector filter. The options allow customization of the parameters related to the batch and retry logic.

func (*Engine) Done

func (e *Engine) Done() <-chan struct{}

Done returns a done channel that is closed once the engine has fully stopped. For the consensus engine, we wait for hotstuff to finish.

func (*Engine) EntityByID

func (e *Engine) EntityByID(entityID flow.Identifier, selector flow.IdentityFilter[flow.Identity])

EntityByID adds an entity to the list of entities to be requested from the provider. It is idempotent, meaning that adding the same entity to the requester engine multiple times has no effect, unless the item has expired due to too many requests and has thus been deleted from the list. The provided selector will be applied to the set of valid providers on top of the global selector injected upon construction. It allows for finer-grained control over which subset of providers to request a given entity from, such as selection of a collection cluster. Use `filter.Any` if no additional selection is required. Checks integrity of response to make sure that we got entity that we were requesting.

func (*Engine) Force

func (e *Engine) Force()

Force will force the requester engine to dispatch all currently valid batch requests.

func (*Engine) Process

func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message interface{}) error

Process processes the given message from the node with the given origin ID in a blocking manner. It returns the potential processing error when done.

func (*Engine) ProcessLocal

func (e *Engine) ProcessLocal(message interface{}) error

ProcessLocal processes an message originating on the local node.

func (*Engine) Query added in v0.15.0

func (e *Engine) Query(key flow.Identifier, selector flow.IdentityFilter[flow.Identity])

Query will request data through the request engine backing the interface. The additional selector will be applied to the subset of valid providers for the data and allows finer-grained control over which providers to request data from. Doesn't perform integrity check can be used to get entities without knowing their ID.

func (*Engine) Ready

func (e *Engine) Ready() <-chan struct{}

Ready returns a ready channel that is closed once the engine has fully started. For consensus engine, this is true once the underlying consensus algorithm has started.

func (*Engine) Submit

func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, message interface{})

Submit submits the given message from the node with the given origin ID for processing in a non-blocking manner. It returns instantly and logs a potential processing error internally when done.

func (*Engine) SubmitLocal

func (e *Engine) SubmitLocal(message interface{})

SubmitLocal submits an message originating on the local node.

func (*Engine) WithHandle

func (e *Engine) WithHandle(handle HandleFunc)

WithHandle sets the handle function of the requester, which is how it processes returned entities. The engine can not be started without setting the handle function. It is done in a separate call so that the requester can be injected into engines upon construction, and then provide a handle function to the requester from that engine itself.

type HandleFunc

type HandleFunc func(originID flow.Identifier, entity flow.Entity)

HandleFunc is a function provided to the requester engine to handle an entity once it has been retrieved from a provider. The function should be non-blocking and errors should be handled internally within the function.

type Item

type Item struct {
	EntityID      flow.Identifier                    // ID for the entity to be requested
	NumAttempts   uint                               // number of times the entity was requested
	LastRequested time.Time                          // approximate timestamp of last request
	RetryAfter    time.Duration                      // interval until request should be retried
	ExtraSelector flow.IdentityFilter[flow.Identity] // additional filters for providers of this entity
	// contains filtered or unexported fields
}

type OptionFunc

type OptionFunc func(*Config)

func WithBatchInterval

func WithBatchInterval(interval time.Duration) OptionFunc

WithBatchInterval sets a custom interval at which we scan for pending items and batch them for requesting.

func WithBatchThreshold

func WithBatchThreshold(threshold uint) OptionFunc

WithBatchThreshold sets a custom threshold for the maximum size of a batch. If we have the given amount of pending items, we immediately send a batch.

func WithRetryAttempts

func WithRetryAttempts(attempts uint) OptionFunc

WithRetryAttempts sets the number of attempts we will make before we give up on retrying. Use zero for infinite retries.

func WithRetryFunction

func WithRetryFunction(retry RetryFunc) OptionFunc

WithRetryFunction sets the function at which the retry interval increases.

func WithRetryInitial

func WithRetryInitial(interval time.Duration) OptionFunc

WithRetryInitial sets the initial interval for dispatching a request for the second time.

func WithRetryMaximum

func WithRetryMaximum(interval time.Duration) OptionFunc

WithRetryMaximum sets the maximum retry interval at which we will retry.

func WithValidateStaking added in v0.21.0

func WithValidateStaking(validateStaking bool) OptionFunc

WithValidateStaking sets the flag which determines if the target and origin must be checked for staking

type RetryFunc

type RetryFunc func(time.Duration) time.Duration

func RetryConstant

func RetryConstant() RetryFunc

func RetryExponential

func RetryExponential(exponent float64) RetryFunc

func RetryGeometric

func RetryGeometric(factor float64) RetryFunc

func RetryLinear

func RetryLinear(increase time.Duration) RetryFunc

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL