messagequeue

package
v0.35.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2025 License: Apache-2.0, MIT Imports: 15 Imported by: 0

Documentation

Overview

Package messagequeue implements a queue of want messages to send to peers.

There is a MessageQueue for each peer. The MessageQueue does not enqueue individual outgoing messages, but accumulates information to put into the next message. Each MessageQueue keeps want lists and CIDs to cancel:

  • sent/pending peer wants + sent times
  • sent/pending broadcast wants + sent times
  • cancel CIDs

As messages are added, existing wantlist items may be changed or removed. For example, adding a cancel to the queue for some CIDs also removes any pending wants for those same CIDs. Adding a want will remove a cancel for that CID. If a want already exists then only the type and priority may be adjusted for that same want, so that duplicate messages are not sent.

When enough message updates have accumulated or it has been long enough since the previous message was sent, then send the current message. The message contains wants and cancels up to a limited size, and is sent to the peer. The time that the message was sent is recorded.

If a want has been sent with no response received, for longer than the rebroadcast interval, then the want is moved back to the pending list to be resent to the peer.

When a response is received, the earliest request time is used to calculate the longest latency for updating the message timeout time. The sent times are cleared for CIDs in the response.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithDontHaveTimeoutConfig added in v0.26.0

func WithDontHaveTimeoutConfig(dhtConfig *DontHaveTimeoutConfig) option

func WithPerPeerSendDelay added in v0.27.3

func WithPerPeerSendDelay(perPeerDelay time.Duration) option

Types

type DontHaveTimeoutConfig added in v0.26.0

type DontHaveTimeoutConfig struct {
	// DontHaveTimeout is used to simulate a DONT_HAVE when communicating with
	// a peer whose Bitswap client doesn't support the DONT_HAVE response,
	// or when the peer takes too long to respond.
	// If the peer doesn't respond to a want-block within the timeout, the
	// local node assumes that the peer doesn't have the block.
	DontHaveTimeout time.Duration

	// MaxExpectedWantProcessTime is the maximum amount of time we expect a
	// peer takes to process a want and initiate sending a response to us
	MaxExpectedWantProcessTime time.Duration

	// MaxTimeout is the maximum allowed timeout, regardless of latency
	MaxTimeout time.Duration
	// MinTimeout is the minimum allowed timeout, regardless of latency
	MinTimeout time.Duration
	// PingLatencyMultiplier is multiplied by the average ping time to
	// get an upper bound on how long we expect to wait for a peer's response
	// to arrive
	PingLatencyMultiplier int

	// MessageLatencyAlpha is the alpha supplied to the message latency EWMA
	MessageLatencyAlpha float64

	// MessageLatencyMultiplier gives a margin for error. The timeout is calculated as
	// MessageLatencyMultiplier * message latency
	MessageLatencyMultiplier int
	// contains filtered or unexported fields
}

func DefaultDontHaveTimeoutConfig added in v0.26.0

func DefaultDontHaveTimeoutConfig() *DontHaveTimeoutConfig

type DontHaveTimeoutManager

type DontHaveTimeoutManager interface {
	// Start the manager (idempotent)
	Start()
	// Shutdown the manager (Shutdown is final, manager cannot be restarted)
	Shutdown()
	// AddPending adds the wants as pending a response. If the are not
	// canceled before the timeout, the OnDontHaveTimeout method will be called.
	AddPending([]cid.Cid)
	// CancelPending removes the wants
	CancelPending([]cid.Cid)
	// UpdateMessageLatency informs the manager of a new latency measurement
	UpdateMessageLatency(time.Duration)
}

DontHaveTimeoutManager pings a peer to estimate latency so it can set a reasonable upper bound on when to consider a DONT_HAVE request as timed out (when connected to a peer that doesn't support DONT_HAVE messages)

type MessageNetwork

type MessageNetwork interface {
	Connect(context.Context, peer.AddrInfo) error
	NewMessageSender(context.Context, peer.ID, *bsnet.MessageSenderOpts) (bsnet.MessageSender, error)
	Latency(peer.ID) time.Duration
	Ping(context.Context, peer.ID) ping.Result
	Self() peer.ID
}

MessageNetwork is any network that can connect peers and generate a message sender.

type MessageQueue

type MessageQueue struct {
	BcastInc func()
	// contains filtered or unexported fields
}

MessageQueue implements queue of want messages to send to peers.

func New

func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout, options ...option) *MessageQueue

New creates a new MessageQueue.

If onDontHaveTimeout is nil, then the dontHaveTimeoutMrg is disabled.

func (*MessageQueue) AddBroadcastWantHaves

func (mq *MessageQueue) AddBroadcastWantHaves(wantHaves []cid.Cid)

Add want-haves that are part of a broadcast to all connected peers

func (*MessageQueue) AddCancels

func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid)

Add cancel messages for the given keys.

func (*MessageQueue) AddWants

func (mq *MessageQueue) AddWants(wantBlocks []cid.Cid, wantHaves []cid.Cid)

Add want-haves and want-blocks for the peer for this message queue.

func (*MessageQueue) HasMessage added in v0.32.0

func (mq *MessageQueue) HasMessage() bool

func (*MessageQueue) RebroadcastNow added in v0.25.0

func (mq *MessageQueue) RebroadcastNow()

func (*MessageQueue) ResponseReceived

func (mq *MessageQueue) ResponseReceived(ks []cid.Cid)

ResponseReceived is called when a message is received from the network. ks is the set of blocks, HAVEs and DONT_HAVEs in the message Note that this is just used to calculate latency.

func (*MessageQueue) Shutdown

func (mq *MessageQueue) Shutdown()

Shutdown stops the processing of messages for a message queue.

func (*MessageQueue) Startup

func (mq *MessageQueue) Startup()

Startup starts the processing of messages and rebroadcasting.

type OnDontHaveTimeout

type OnDontHaveTimeout func(peer.ID, []cid.Cid)

Fires when a timeout occurs waiting for a response from a peer running an older version of Bitswap that doesn't support DONT_HAVE messages.

type PeerConnection

type PeerConnection interface {
	// Ping the peer
	Ping(context.Context) ping.Result
	// The average latency of all pings
	Latency() time.Duration
}

PeerConnection is a connection to a peer that can be pinged, and the average latency measured

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL