Documentation
¶
Overview ¶
Package messagequeue implements a queue of want messages to send to peers.
There is a MessageQueue for each peer. The MessageQueue does not enqueue individual outgoing messages, but accumulates information to put into the next message. Each MessageQueue keeps want lists and CIDs to cancel:
- sent/pending peer wants + sent times
- sent/pending broadcast wants + sent times
- cancel CIDs
As messages are added, existing wantlist items may be changed or removed. For example, adding a cancel to the queue for some CIDs also removes any pending wants for those same CIDs. Adding a want will remove a cancel for that CID. If a want already exists then only the type and priority may be adjusted for that same want, so that duplicate messages are not sent.
When enough message updates have accumulated or it has been long enough since the previous message was sent, then send the current message. The message contains wants and cancels up to a limited size, and is sent to the peer. The time that the message was sent is recorded.
If a want has been sent with no response received, for longer than the rebroadcast interval, then the want is moved back to the pending list to be resent to the peer.
When a response is received, the earliest request time is used to calculate the longest latency for updating the message timeout time. The sent times are cleared for CIDs in the response.
Index ¶
- func WithDontHaveTimeoutConfig(dhtConfig *DontHaveTimeoutConfig) option
- func WithPerPeerSendDelay(perPeerDelay time.Duration) option
- type DontHaveTimeoutConfig
- type DontHaveTimeoutManager
- type MessageNetwork
- type MessageQueue
- func (mq *MessageQueue) AddBroadcastWantHaves(wantHaves []cid.Cid)
- func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid)
- func (mq *MessageQueue) AddWants(wantBlocks []cid.Cid, wantHaves []cid.Cid)
- func (mq *MessageQueue) HasMessage() bool
- func (mq *MessageQueue) RebroadcastNow()
- func (mq *MessageQueue) ResponseReceived(ks []cid.Cid)
- func (mq *MessageQueue) Shutdown()
- func (mq *MessageQueue) Startup()
- type OnDontHaveTimeout
- type PeerConnection
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func WithDontHaveTimeoutConfig ¶ added in v0.26.0
func WithDontHaveTimeoutConfig(dhtConfig *DontHaveTimeoutConfig) option
func WithPerPeerSendDelay ¶ added in v0.27.3
Types ¶
type DontHaveTimeoutConfig ¶ added in v0.26.0
type DontHaveTimeoutConfig struct {
// DontHaveTimeout is used to simulate a DONT_HAVE when communicating with
// a peer whose Bitswap client doesn't support the DONT_HAVE response,
// or when the peer takes too long to respond.
// If the peer doesn't respond to a want-block within the timeout, the
// local node assumes that the peer doesn't have the block.
DontHaveTimeout time.Duration
// MaxExpectedWantProcessTime is the maximum amount of time we expect a
// peer takes to process a want and initiate sending a response to us
MaxExpectedWantProcessTime time.Duration
// MaxTimeout is the maximum allowed timeout, regardless of latency
MaxTimeout time.Duration
// MinTimeout is the minimum allowed timeout, regardless of latency
MinTimeout time.Duration
// PingLatencyMultiplier is multiplied by the average ping time to
// get an upper bound on how long we expect to wait for a peer's response
// to arrive
PingLatencyMultiplier int
// MessageLatencyAlpha is the alpha supplied to the message latency EWMA
MessageLatencyAlpha float64
// MessageLatencyMultiplier gives a margin for error. The timeout is calculated as
// MessageLatencyMultiplier * message latency
MessageLatencyMultiplier int
// contains filtered or unexported fields
}
func DefaultDontHaveTimeoutConfig ¶ added in v0.26.0
func DefaultDontHaveTimeoutConfig() *DontHaveTimeoutConfig
type DontHaveTimeoutManager ¶
type DontHaveTimeoutManager interface {
// Start the manager (idempotent)
Start()
// Shutdown the manager (Shutdown is final, manager cannot be restarted)
Shutdown()
// AddPending adds the wants as pending a response. If the are not
// canceled before the timeout, the OnDontHaveTimeout method will be called.
AddPending([]cid.Cid)
// CancelPending removes the wants
CancelPending([]cid.Cid)
// UpdateMessageLatency informs the manager of a new latency measurement
UpdateMessageLatency(time.Duration)
}
DontHaveTimeoutManager pings a peer to estimate latency so it can set a reasonable upper bound on when to consider a DONT_HAVE request as timed out (when connected to a peer that doesn't support DONT_HAVE messages)
type MessageNetwork ¶
type MessageNetwork interface {
Connect(context.Context, peer.AddrInfo) error
NewMessageSender(context.Context, peer.ID, *bsnet.MessageSenderOpts) (bsnet.MessageSender, error)
Latency(peer.ID) time.Duration
Ping(context.Context, peer.ID) ping.Result
Self() peer.ID
}
MessageNetwork is any network that can connect peers and generate a message sender.
type MessageQueue ¶
type MessageQueue struct {
BcastInc func()
// contains filtered or unexported fields
}
MessageQueue implements queue of want messages to send to peers.
func New ¶
func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout, options ...option) *MessageQueue
New creates a new MessageQueue.
If onDontHaveTimeout is nil, then the dontHaveTimeoutMrg is disabled.
func (*MessageQueue) AddBroadcastWantHaves ¶
func (mq *MessageQueue) AddBroadcastWantHaves(wantHaves []cid.Cid)
Add want-haves that are part of a broadcast to all connected peers
func (*MessageQueue) AddCancels ¶
func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid)
Add cancel messages for the given keys.
func (*MessageQueue) AddWants ¶
func (mq *MessageQueue) AddWants(wantBlocks []cid.Cid, wantHaves []cid.Cid)
Add want-haves and want-blocks for the peer for this message queue.
func (*MessageQueue) HasMessage ¶ added in v0.32.0
func (mq *MessageQueue) HasMessage() bool
func (*MessageQueue) RebroadcastNow ¶ added in v0.25.0
func (mq *MessageQueue) RebroadcastNow()
func (*MessageQueue) ResponseReceived ¶
func (mq *MessageQueue) ResponseReceived(ks []cid.Cid)
ResponseReceived is called when a message is received from the network. ks is the set of blocks, HAVEs and DONT_HAVEs in the message Note that this is just used to calculate latency.
func (*MessageQueue) Shutdown ¶
func (mq *MessageQueue) Shutdown()
Shutdown stops the processing of messages for a message queue.
func (*MessageQueue) Startup ¶
func (mq *MessageQueue) Startup()
Startup starts the processing of messages and rebroadcasting.
type OnDontHaveTimeout ¶
Fires when a timeout occurs waiting for a response from a peer running an older version of Bitswap that doesn't support DONT_HAVE messages.