rangesync

package
v1.8.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2025 License: MIT Imports: 26 Imported by: 0

Documentation

Overview

Package rangesync implements pairwise set reconciliation protocol.

The protocol is based on this paper: Range-Based Set Reconciliation by Aljoscha Meyer https://arxiv.org/pdf/2212.13567.pdf

The protocol has an advantage of a possibility to build a reusable synchronization helper structures (ordered sets) and also a possibility to use the same structure to reconcile subranges of the ordered sets efficiently against multiple peers. The disadvantage is that the algorithm is not very efficient for large differences (like tens of thousands of elements) when the elements have no apparent ordering, which is the case with hash-based IDs like ATX ids. In order to mitigate this problem, the protocol supports exchanging recently-received items between peers before beginning the actual reconciliation. When the difference is large, the algorithm can automatically degrade to dumb "send me the whole set" mode. The resulting increased load on the network and the peer computers is mitigated by splitting the element range between multiple peers.

The core concepts are the following:

  1. Ordered sets. The sets to be reconciled are ordered. The elements in the set are also called "keys" b/c they represent the IDs of the actual objects being synchronized, such as ATXs.
  2. Ranges. [x, y) range denotes a range of items in the set, where x is inclusive and y is exclusive. The ranges may wrap around. This means: - x == y: the whole set - x < y: a normal range starting with x and ending below y - x > y: a wrapped around range, that is from x (inclusive) to the end of the set and from the beginning of the set to y, non-inclusive.
  3. Fingerprint. Each range has a fingerprint, which is all the IDs (keys) in the range XORed together. The fingerprint is used to quickly check if the range is in sync with the peer.

Each OrderedSet is supposed to be able to provide the number of items and the fingerprint of items in a range relatively cheaply. Additionally, the OrderedSet has Recent method that retrieves recently-based keys (elements) based on the provided timestamp. This reconciliation helper mechanism is optional, so Recent may just return an empty sequence.

Below is a log of a sample interaction between two peers A and B. Interactions:

A: empty set; B: empty set A -> B:

EmptySet
EndRound

B -> A:

Done

A: empty set; B: non-empty set A -> B:

EmptySet
EndRound

B -> A:

ItemBatch
ItemBatch
...

A -> B:

Done

A: small set (< maxSendRange); B: non-empty set A -> B:

ItemBatch
ItemBatch
...
RangeContents [x, y)
EndRound

B -> A:

ItemBatch
ItemBatch
...

A -> B:

Done

A: large set; B: non-empty set; maxDiff < 0 A -> B:

Fingerprint [x, y)
EndRound

B -> A:

Fingerprint [x, m)
Fingerprint [m, y)
EndRound

A -> B:

ItemBatch
ItemBatch
...
RangeContents [x, m)
EndRound

A -> B:

Done

A: large set; B: non-empty set; maxDiff >= 0; differenceMetric <= maxDiff NOTE: Sample includes fingerprint A -> B:

Sample [x, y)
EndRound

B -> A:

Fingerprint [x, m)
Fingerprint [m, y)
EndRound

A -> B:

ItemBatch
ItemBatch
...
RangeContents [x, m)
EndRound

A -> B:

Done

A: large set; B: non-empty set; maxDiff >= 0; differenceMetric > maxDiff A -> B:

Sample [x, y)
EndRound

B -> A:

ItemBatch
ItemBatch
...
RangeContents [x, y)
EndRound

A -> B:

Done

A: large set; B: non-empty set; sync priming; maxDiff >= 0; differenceMetric <= maxDiff (after priming) A -> B:

ItemBatch
ItemBatch
...
Recent
EndRound

B -> A:

ItemBatch
ItemBatch
...
Sample [x, y)
EndRound

A -> B:

Fingerprint [x, m)
Fingerprint [m, y)
EndRound

B -> A:

ItemBatch
ItemBatch
...
RangeContents [x, m)
EndRound

A -> B:

Done

A: large set; B: non-empty set; sync priming; maxDiff < 0 A -> B:

ItemBatch
ItemBatch
...
Recent
EndRound

B -> A:

ItemBatch
ItemBatch
...
Fingerprint [x, y)
EndRound

A -> B:

Fingerprint [x, m)
Fingerprint [m, y)
EndRound

B -> A:

ItemBatch
ItemBatch
...
RangeContents [x, m)
EndRound

A -> B:

Done

nolint

Index

Constants

View Source
const (
	DefaultMaxSendRange  = 1
	DefaultItemChunkSize = 1024
	DefaultSampleSize    = 200
)
View Source
const (
	// FingerprintSize is the size of a fingerprint in bytes.
	FingerprintSize = 12
)

Variables

View Source
var (
	ErrTrafficLimitExceeded = errors.New("sync traffic limit exceeded")
	ErrMessageLimitExceeded = errors.New("sync message limit exceeded")
)

Functions

func CalcSim

func CalcSim(a, b []MinhashSampleItem) float64

CalcSim estimates the Jaccard similarity coefficient between two sets based on the samples, which are derived from N lowest-valued elements of each set. The return value is in 0..1 range, with 0 meaning almost no intersection and 1 meaning the sets are mostly equal. The precision of the estimate will suffer if none of the sets are empty and they have different size, with return value tending to be lower than the actual J coefficient.

func SyncMessageToString

func SyncMessageToString(m SyncMessage) string

SyncMessageToString returns string representation of a sync message.

Types

type CompactHash

type CompactHash struct {
	H KeyBytes
}

CompactHash encodes hashes in a compact form, skipping trailing zeroes. It also supports a nil hash (no value). The encoding format is as follows: byte 0: spec byte bytes 1..n: data bytes

The format of the spec byte is as follows: bits 0..5: number of non-zero leading bytes bits 6..7: hash type

The following hash types are supported: 0: nil hash 1: 32-byte hash 2,3: reserved

NOTE: when adding new hash types, we need to add a mechanism that makes sure that every received hash is of the expected type. Alternatively, we need to add some kind of context to the scale.Decoder / scale.Encoder, which may contain the size of hashes to be used.

func (*CompactHash) DecodeScale

func (c *CompactHash) DecodeScale(dec *scale.Decoder) (int, error)

DecodeScale implements scale.Decodable.

func (*CompactHash) EncodeScale

func (c *CompactHash) EncodeScale(enc *scale.Encoder) (int, error)

EncodeScale implements scale.Encodable.

func (*CompactHash) ToOrdered

func (c *CompactHash) ToOrdered() KeyBytes

type Conduit

type Conduit interface {
	// NextMessage returns the next SyncMessage, or nil if there are no more
	// SyncMessages for this session.
	NextMessage() (SyncMessage, error)
	// Send sends a SyncMessage to the peer.
	Send(SyncMessage) error
}

Conduit handles receiving and sending peer messages.

type Dispatcher

type Dispatcher struct {
	*server.Server
	// contains filtered or unexported fields
}

Dispatcher multiplexes a P2P Server to multiple set reconcilers.

func NewDispatcher

func NewDispatcher(logger *zap.Logger) *Dispatcher

NewDispatcher creates a new Dispatcher.

func (*Dispatcher) Dispatch

func (d *Dispatcher) Dispatch(
	ctx context.Context,
	peer p2p.Peer,
	req []byte,
	stream io.ReadWriter,
) (err error)

Dispatch dispatches a request to a handler.

func (*Dispatcher) Register

func (d *Dispatcher) Register(name string, h Handler)

Register registers a handler with a Dispatcher.

func (*Dispatcher) SetupServer

func (d *Dispatcher) SetupServer(
	host host.Host,
	proto string,
	opts ...server.Opt,
) *server.Server

SetupServer creates a new P2P Server for the Dispatcher.

type DoneMessage

type DoneMessage struct{ Marker }

DoneMessage is a SyncMessage that denotes the end of the synchronization. The peer should stop any further processing after receiving this message.

func (*DoneMessage) Type

func (*DoneMessage) Type() MessageType

type DumbSet added in v1.7.7

type DumbSet struct {
	FPFunc func(items []KeyBytes) Fingerprint
	// contains filtered or unexported fields
}

DumbSet is a simple OrderedSet implementation that doesn't include any optimizations. It is intended to be only used in tests.

func (*DumbSet) Add added in v1.7.7

func (ds *DumbSet) Add(id KeyBytes) error

Add implements the OrderedSet.

func (*DumbSet) AddReceived added in v1.7.7

func (ds *DumbSet) AddReceived()

AddReceived adds all the received items to the set.

func (*DumbSet) AddUnchecked added in v1.7.7

func (ds *DumbSet) AddUnchecked(id KeyBytes)

AddUnchecked adds an item to the set without registerting the item for checks as in case of Add and Receive.

func (*DumbSet) Advance added in v1.7.7

func (ds *DumbSet) Advance() error

Advance implements OrderedSet.

func (*DumbSet) EnsureLoaded added in v1.7.7

func (ds *DumbSet) EnsureLoaded() error

EnsureLoaded implements OrderedSet.

func (*DumbSet) Has added in v1.7.7

func (ds *DumbSet) Has(k KeyBytes) (bool, error)

Has implements OrderedSet.

func (*DumbSet) Loaded added in v1.7.9

func (ds *DumbSet) Loaded() bool

Loaded implements OrderedSet.

func (*DumbSet) RangeInfo added in v1.7.10

func (ds *DumbSet) RangeInfo(x, y KeyBytes) (RangeInfo, error)

RangeInfo implements OrderedSet.

func (*DumbSet) Receive added in v1.7.7

func (ds *DumbSet) Receive(id KeyBytes) error

Receive implements the OrderedSet.

func (*DumbSet) Received added in v1.7.7

func (ds *DumbSet) Received() SeqResult

Received implements the OrderedSet.

func (*DumbSet) Recent added in v1.7.7

func (ds *DumbSet) Recent(since time.Time) (SeqResult, int)

Recent implements OrderedSet.

func (*DumbSet) Release added in v1.7.7

func (ds *DumbSet) Release()

Release implements OrderedSet.

func (*DumbSet) SetAllowMultiReceive added in v1.7.7

func (ds *DumbSet) SetAllowMultiReceive(allow bool)

SetAllowMultiReceive sets whether the set allows receiving the same item multiple times.

func (*DumbSet) SetInfo added in v1.7.10

func (ds *DumbSet) SetInfo() (RangeInfo, error)

SetInfo implements OrderedSet.

func (*DumbSet) SplitRange added in v1.7.7

func (ds *DumbSet) SplitRange(x, y KeyBytes, count int) (SplitInfo, error)

SplitRange implements OrderedSet.

func (*DumbSet) WithCopy added in v1.7.9

func (ds *DumbSet) WithCopy(toCall func(OrderedSet) error) error

WithCopy implements OrderedSet.

type EmptyRangeMessage

type EmptyRangeMessage struct {
	RangeX, RangeY CompactHash
}

EmptyRangeMessage notifies the peer that it needs to send all of its items in the specified range.

func (*EmptyRangeMessage) Count

func (m *EmptyRangeMessage) Count() int

func (*EmptyRangeMessage) DecodeScale

func (t *EmptyRangeMessage) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*EmptyRangeMessage) EncodeScale

func (t *EmptyRangeMessage) EncodeScale(enc *scale.Encoder) (total int, err error)

func (*EmptyRangeMessage) Fingerprint

func (m *EmptyRangeMessage) Fingerprint() Fingerprint

func (*EmptyRangeMessage) Keys

func (m *EmptyRangeMessage) Keys() []KeyBytes

func (*EmptyRangeMessage) Sample

func (m *EmptyRangeMessage) Sample() []MinhashSampleItem

func (*EmptyRangeMessage) Since

func (m *EmptyRangeMessage) Since() time.Time

func (*EmptyRangeMessage) Type

func (m *EmptyRangeMessage) Type() MessageType

func (*EmptyRangeMessage) X

func (m *EmptyRangeMessage) X() KeyBytes

func (*EmptyRangeMessage) Y

func (m *EmptyRangeMessage) Y() KeyBytes

type EmptySetMessage

type EmptySetMessage struct{ Marker }

EmptySetMessage is a SyncMessage that denotes an empty set, requesting the peer to send all of its items.

func (*EmptySetMessage) Type

func (*EmptySetMessage) Type() MessageType

type EndRoundMessage

type EndRoundMessage struct{ Marker }

EndRoundMessage is a SyncMessage that denotes the end of the sync round.

func (*EndRoundMessage) Type

func (*EndRoundMessage) Type() MessageType

type Fingerprint

type Fingerprint [FingerprintSize]byte

Fingerprint represents a fingerprint of a set of keys. The fingerprint is obtained by XORing together the keys in the set.

func CombineFingerprints added in v1.7.7

func CombineFingerprints(a, b Fingerprint) Fingerprint

CombineFingerprints combines two fingerprints into one.

func EmptyFingerprint

func EmptyFingerprint() Fingerprint

EmptyFingerprint returns an empty fingerprint.

func MustParseHexFingerprint

func MustParseHexFingerprint(s string) Fingerprint

MustParseHexFingerprint converts a hex string to Fingerprint.

func RandomFingerprint

func RandomFingerprint() Fingerprint

RandomFingerprint generates a random fingerprint.

func (*Fingerprint) BitFromLeft

func (fp *Fingerprint) BitFromLeft(i int) bool

BitFromLeft returns the n-th bit from the left in the fingerprint.

func (Fingerprint) Compare

func (fp Fingerprint) Compare(other Fingerprint) int

Compare compares two fingerprints.

func (Fingerprint) ShortString

func (fp Fingerprint) ShortString() string

String implements log.ShortString.

func (Fingerprint) String

func (fp Fingerprint) String() string

String implements fmt.Stringer.

func (*Fingerprint) Update

func (fp *Fingerprint) Update(h []byte)

Update includes the byte slice in the fingerprint.

type FingerprintMessage

type FingerprintMessage struct {
	RangeX, RangeY   CompactHash
	RangeFingerprint Fingerprint
	NumItems         uint32
}

FingerprintMessage contains range fingerprint for comparison against the peer's fingerprint of the range with the same bounds [RangeX, RangeY).

func (*FingerprintMessage) Count

func (m *FingerprintMessage) Count() int

func (*FingerprintMessage) DecodeScale

func (t *FingerprintMessage) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*FingerprintMessage) EncodeScale

func (t *FingerprintMessage) EncodeScale(enc *scale.Encoder) (total int, err error)

func (*FingerprintMessage) Fingerprint

func (m *FingerprintMessage) Fingerprint() Fingerprint

func (*FingerprintMessage) Keys

func (m *FingerprintMessage) Keys() []KeyBytes

func (*FingerprintMessage) Sample

func (m *FingerprintMessage) Sample() []MinhashSampleItem

func (*FingerprintMessage) Since

func (m *FingerprintMessage) Since() time.Time

func (*FingerprintMessage) Type

func (m *FingerprintMessage) Type() MessageType

func (*FingerprintMessage) X

func (*FingerprintMessage) Y

type Handler

type Handler func(context.Context, p2p.Peer, io.ReadWriter) error

Handler is a function that handles a request for a Dispatcher.

type ItemBatchMessage

type ItemBatchMessage struct {
	ContentKeys KeyCollection `scale:"max=1024"`
}

ItemBatchMessage denotes a batch of items to be added to the peer's set.

func (*ItemBatchMessage) Count

func (m *ItemBatchMessage) Count() int

func (*ItemBatchMessage) DecodeScale

func (t *ItemBatchMessage) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*ItemBatchMessage) EncodeScale

func (t *ItemBatchMessage) EncodeScale(enc *scale.Encoder) (total int, err error)

func (*ItemBatchMessage) Fingerprint

func (m *ItemBatchMessage) Fingerprint() Fingerprint

func (*ItemBatchMessage) Keys

func (m *ItemBatchMessage) Keys() []KeyBytes

func (*ItemBatchMessage) Sample

func (m *ItemBatchMessage) Sample() []MinhashSampleItem

func (*ItemBatchMessage) Since

func (m *ItemBatchMessage) Since() time.Time

func (*ItemBatchMessage) Type

func (m *ItemBatchMessage) Type() MessageType

func (*ItemBatchMessage) X

func (m *ItemBatchMessage) X() KeyBytes

func (*ItemBatchMessage) Y

func (m *ItemBatchMessage) Y() KeyBytes

type KeyBytes

type KeyBytes []byte

KeyBytes represents an item (key) in a reconciliable set.

func MustParseHexKeyBytes

func MustParseHexKeyBytes(s string) KeyBytes

MustParseHexKeyBytes converts a hex string to KeyBytes.

func RandomKeyBytes

func RandomKeyBytes(size int) KeyBytes

RandomKeyBytes generates random data in bytes for testing.

func (KeyBytes) BitFromLeft added in v1.7.7

func (k KeyBytes) BitFromLeft(i int) bool

BitFromLeft returns the n-th bit from the left in the key.

func (KeyBytes) Clone

func (k KeyBytes) Clone() KeyBytes

Clone returns a copy of the key.

func (KeyBytes) Compare

func (k KeyBytes) Compare(other KeyBytes) int

Compare compares two keys.

func (KeyBytes) Inc

func (k KeyBytes) Inc() (overflow bool)

Inc returns the key with the same number of bytes as this one, obtained by incrementing the key by one. It returns true if the increment has caused an overflow.

func (KeyBytes) IsZero

func (k KeyBytes) IsZero() bool

IsZero returns true if all bytes in the key are zero.

func (KeyBytes) ShortString

func (k KeyBytes) ShortString() string

String implements log.ShortString.

func (KeyBytes) String

func (k KeyBytes) String() string

String implements fmt.Stringer.

func (KeyBytes) Trim added in v1.7.7

func (k KeyBytes) Trim(bit int)

Trim zeroes all the bits in the key starting with the given bit index.

func (KeyBytes) Zero

func (k KeyBytes) Zero()

Zero sets all bytes in the key to zero.

type KeyCollection

type KeyCollection struct {
	Keys []KeyBytes
}

KeyCollection represents a collection of keys of the same size.

func (*KeyCollection) DecodeScale

func (c *KeyCollection) DecodeScale(dec *scale.Decoder) (int, error)

DecodeScale implements scale.Decodable.

func (*KeyCollection) EncodeScale

func (c *KeyCollection) EncodeScale(enc *scale.Encoder) (int, error)

EncodeScale implements scale.Encodable.

type Marker

type Marker struct{}

func (*Marker) Count

func (*Marker) Count() int

func (*Marker) Fingerprint

func (*Marker) Fingerprint() Fingerprint

func (*Marker) Keys

func (*Marker) Keys() []KeyBytes

func (*Marker) Sample

func (*Marker) Sample() []MinhashSampleItem

func (*Marker) Since

func (*Marker) Since() time.Time

func (*Marker) X

func (*Marker) X() KeyBytes

func (*Marker) Y

func (*Marker) Y() KeyBytes

type MessageType

type MessageType byte

MessageType specifies the type of a sync message.

const (
	// Done message is sent to indicate the completion of the whole sync run.
	MessageTypeDone MessageType = iota
	// EndRoundMessage is sent to indicate the completion of a single round.
	MessageTypeEndRound
	// EmptySetMessage is sent to indicate that the set is empty. In response, the
	// receiving side sends its whole set using ItemBatch and RangeContents messages.
	MessageTypeEmptySet
	// EmptyRangeMessage is sent to indicate that the specified range is empty. In
	// response, the receiving side sends the contents of the range using ItemBatch
	// and RangeContents messages.
	MessageTypeEmptyRange
	// Fingerprint carries a range fingerprint. Depending on the local fingerprint of
	// the same range, the receiving side may not reply to this message (range
	// completed), may send the contents of the range using ItemBatch and
	// RangeContents messages if the range is small enough, or it can split the range
	// in two and send back the Fingerprint messages for each of resulting parts.
	MessageTypeFingerprint
	// RangeContents message is sent after ItemBatch messages to indicate what range
	// the messages belong too. If the receiving side has any items within the same
	// range, these items are sent back using ItemBatch and RangeContents messages.
	MessageTypeRangeContents
	// ItemBatchMessage is sent to carry a batch of items.
	MessageTypeItemBatch
	// ProbeMessage is sent to request the count of items and an estimate of the
	// Jaccard similarity coefficient for the specfied range.
	MessageTypeProbe
	// Sample message carries a minhash sample along with the fingerprint and
	// the fingerprint and number of items within the specied range.
	MessageTypeSample
	// Recent message is sent after ItemBatch messages to indicate that the batches
	// contains items recently added to the set.
	MessageTypeRecent
)

func (MessageType) String

func (mtype MessageType) String() string

String implements Stringer.

type MinhashSampleItem

type MinhashSampleItem uint32

MinhashSampleItem represents an item of minhash sample subset.

func MinhashSampleItemFromKeyBytes

func MinhashSampleItemFromKeyBytes(h KeyBytes) MinhashSampleItem

MinhashSampleItemFromKeyBytes uses lower 32 bits of a hash as a MinhashSampleItem.

func Sample

func Sample(sr SeqResult, count, sampleSize int) ([]MinhashSampleItem, error)

Sample retrieves min(count, sampleSize) items friom the ordered sequence, extracting MinhashSampleItem from each value.

func (MinhashSampleItem) Compare

func (m MinhashSampleItem) Compare(other MinhashSampleItem) int

func (*MinhashSampleItem) DecodeScale

func (m *MinhashSampleItem) DecodeScale(d *scale.Decoder) (int, error)

DecodeScale implements scale.Decodable.

func (MinhashSampleItem) EncodeScale

func (m MinhashSampleItem) EncodeScale(e *scale.Encoder) (int, error)

EncodeScale implements scale.Encodable.

func (MinhashSampleItem) String

func (m MinhashSampleItem) String() string

type OrderedSet

type OrderedSet interface {
	// Add adds a new key to the set.
	// It should not perform any additional actions related to handling
	// the received key.
	Add(k KeyBytes) error

	// Receive handles a new key received from the peer.
	// It should not add the key to the set.
	Receive(k KeyBytes) error

	// Received returns the sequence containing all the items received from the peer.
	// Unlike other methods, SeqResult returned by Received called on a copy of the
	// OrderedSet passed to WithCopy callback is expected to be valid outside of the
	// callback as well.
	Received() SeqResult

	// RangeInfo returns RangeInfo for the item range in the ordered set,
	// bounded by [x, y).
	// x == y indicates the whole set.
	// x < y indicates a normal range starting with x and ending below y.
	// x > y indicates a wrapped around range, that is from x (inclusive) to then end
	// of the set and from the beginning of the set to y, non-inclusive.
	// If count >= 0, at most count items are returned, and RangeInfo
	// is returned for the corresponding subrange of the requested range.
	RangeInfo(x, y KeyBytes) (RangeInfo, error)

	// SplitRange splits the range roughly after the specified count of items,
	// returning RangeInfo for the first half and the second half of the range.
	SplitRange(x, y KeyBytes, count int) (SplitInfo, error)

	// SetInfo returns RangeInfo for the whole set.
	SetInfo() (RangeInfo, error)

	// WithCopy runs the specified function, passing to it a temporary shallow copy of
	// the OrderedSet. The copy is discarded after the function returns, releasing
	// any resources associated with it.
	// The list of received items as returned by Received is inherited by the copy.
	WithCopy(toCall func(OrderedSet) error) error

	// Recent returns an Iterator that yields the items added since the specified
	// timestamp. Some OrderedSet implementations may not have Recent implemented, in
	// which case it should return an empty sequence.
	Recent(since time.Time) (SeqResult, int)

	// Loaded returns true if the set is loaded and ready for use.
	Loaded() bool

	// EnsureLoaded ensures that the set is loaded and ready for use.
	// It may do nothing in case of in-memory sets, but may trigger loading
	// from database in case of database-backed sets.
	EnsureLoaded() error

	// Advance advances the set by including the items since the set was last loaded
	// or advanced.
	Advance() error

	// Has returns true if the specified key is present in OrderedSet.
	Has(KeyBytes) (bool, error)
}

OrderedSet represents the set that can be synced against a remote peer. OrderedSet methods are non-threadsafe except for WithCopy, Loaded and EnsureLoaded. SeqResult values obtained by method calls on an OrderedSet passed to WithCopy callback are valid only within the callback and should not be used outside of it, with exception of SeqResult returned by Received, which is expected to be valid outside of the callback as well.

type PairwiseSetSyncer

type PairwiseSetSyncer struct {
	// contains filtered or unexported fields
}

func NewPairwiseSetSyncer

func NewPairwiseSetSyncer(
	logger *zap.Logger,
	r Requester,
	name string,
	cfg RangeSetReconcilerConfig,
) *PairwiseSetSyncer

func NewPairwiseSetSyncerInternal added in v1.7.7

func NewPairwiseSetSyncerInternal(
	logger *zap.Logger,
	r Requester,
	name string,
	cfg RangeSetReconcilerConfig,
	tracer Tracer,
	clock clockwork.Clock,
) *PairwiseSetSyncer

func (*PairwiseSetSyncer) Probe

func (pss *PairwiseSetSyncer) Probe(
	ctx context.Context,
	peer p2p.Peer,
	os OrderedSet,
	x, y KeyBytes,
) (pr ProbeResult, err error)

func (*PairwiseSetSyncer) Received

func (pss *PairwiseSetSyncer) Received() int

func (*PairwiseSetSyncer) Register

func (pss *PairwiseSetSyncer) Register(d *Dispatcher, os OrderedSet)

func (*PairwiseSetSyncer) Sent

func (pss *PairwiseSetSyncer) Sent() int

func (*PairwiseSetSyncer) Serve

func (pss *PairwiseSetSyncer) Serve(ctx context.Context, stream io.ReadWriter, os OrderedSet) error

func (*PairwiseSetSyncer) Sync

func (pss *PairwiseSetSyncer) Sync(
	ctx context.Context,
	peer p2p.Peer,
	os OrderedSet,
	x, y KeyBytes,
) error

type ProbeMessage

type ProbeMessage struct {
	RangeX, RangeY   CompactHash
	RangeFingerprint Fingerprint
	SampleSize       uint32
}

ProbeMessage requests bounded range fingerprint and count from the peer, along with a minhash sample if fingerprints differ.

func (*ProbeMessage) Count

func (m *ProbeMessage) Count() int

func (*ProbeMessage) DecodeScale

func (t *ProbeMessage) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*ProbeMessage) EncodeScale

func (t *ProbeMessage) EncodeScale(enc *scale.Encoder) (total int, err error)

func (*ProbeMessage) Fingerprint

func (m *ProbeMessage) Fingerprint() Fingerprint

func (*ProbeMessage) Keys

func (m *ProbeMessage) Keys() []KeyBytes

func (*ProbeMessage) Sample

func (m *ProbeMessage) Sample() []MinhashSampleItem

func (*ProbeMessage) Since

func (m *ProbeMessage) Since() time.Time

func (*ProbeMessage) Type

func (m *ProbeMessage) Type() MessageType

func (*ProbeMessage) X

func (m *ProbeMessage) X() KeyBytes

func (*ProbeMessage) Y

func (m *ProbeMessage) Y() KeyBytes

type ProbeResult

type ProbeResult struct {
	// True if the peer's range (or full set) is fully in sync with the local range
	// (or full set).
	// Note that Sim==1 does not guarantee that the peer is in sync b/c simhash
	// algorithm is not precise.
	InSync bool
	// Number of items in the range.
	Count int
	// An estimate of Jaccard similarity coefficient between the sets.
	// The range is 0..1, 0 being mostly disjoint sets and 1 being mostly equal sets.
	Sim float64
}

ProbeResult contains the result of a probe.

type RangeContentsMessage

type RangeContentsMessage struct {
	RangeX, RangeY CompactHash
	NumItems       uint32
}

RangeContentsMessage denotes a range for which the set of items has been sent. The peer needs to send back any items it has in the same range bounded by [RangeX, RangeY).

func (*RangeContentsMessage) Count

func (m *RangeContentsMessage) Count() int

func (*RangeContentsMessage) DecodeScale

func (t *RangeContentsMessage) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*RangeContentsMessage) EncodeScale

func (t *RangeContentsMessage) EncodeScale(enc *scale.Encoder) (total int, err error)

func (*RangeContentsMessage) Fingerprint

func (m *RangeContentsMessage) Fingerprint() Fingerprint

func (*RangeContentsMessage) Keys

func (m *RangeContentsMessage) Keys() []KeyBytes

func (*RangeContentsMessage) Sample

func (*RangeContentsMessage) Since

func (m *RangeContentsMessage) Since() time.Time

func (*RangeContentsMessage) Type

func (*RangeContentsMessage) X

func (*RangeContentsMessage) Y

type RangeInfo

type RangeInfo struct {
	// Fingerprint of the interval
	Fingerprint Fingerprint
	// Number of items in the interval
	Count int
	// Items is the sequence of set elements in the interval.
	Items SeqResult
}

RangeInfo contains information about a range of items in the OrderedSet as returned by OrderedSet.GetRangeInfo.

type RangeSetReconciler

type RangeSetReconciler struct {
	// contains filtered or unexported fields
}

RangeSetReconciler reconciles two sets of items using the recursive set reconciliation protocol.

func NewRangeSetReconciler

func NewRangeSetReconciler(logger *zap.Logger, cfg RangeSetReconcilerConfig, os OrderedSet) *RangeSetReconciler

NewRangeSetReconciler creates a new RangeSetReconciler.

func NewRangeSetReconcilerInternal added in v1.7.7

func NewRangeSetReconcilerInternal(
	logger *zap.Logger,
	cfg RangeSetReconcilerConfig,
	os OrderedSet,
	tracer Tracer,
	clock clockwork.Clock,
) *RangeSetReconciler

NewRangeSetReconcilerInternal creates a new RangeSetReconciler. It is only directly called by the tests. It accepts extra tracer and clock parameters.

func (*RangeSetReconciler) HandleProbeResponse

func (rsr *RangeSetReconciler) HandleProbeResponse(c Conduit, info RangeInfo) (pr ProbeResult, err error)

HandleProbeResponse processes the probe response message and returns the probe result. info is the range info returned by InitiateProbe.

func (*RangeSetReconciler) Initiate

func (rsr *RangeSetReconciler) Initiate(c Conduit, x, y KeyBytes) error

Initiate initiates the reconciliation process with the peer. If x and y are non-nil, [x, y) range is reconciled. If x and y are nil, the whole range is reconciled.

func (*RangeSetReconciler) InitiateProbe

func (rsr *RangeSetReconciler) InitiateProbe(
	c Conduit,
	x, y KeyBytes,
) (RangeInfo, error)

InitiateProbe initiates a probe to retrieve the item count and Jaccard similarity coefficient from the peer.

func (*RangeSetReconciler) Run

func (rsr *RangeSetReconciler) Run(c Conduit) error

Run performs sync reconciliation run using specified Conduit to send and receive messages.

type RangeSetReconcilerConfig added in v1.7.7

type RangeSetReconcilerConfig struct {
	// Maximum range size to send instead of further subdividing the input range.
	MaxSendRange uint `mapstructure:"max-send-range"`
	// Size of the item chunk to use when sending the set items.
	ItemChunkSize int `mapstructure:"item-chunk-size"`
	// Size of the MinHash sample to be sent to the peer.
	SampleSize uint `mapstructure:"sample-size"`
	// Maximum set difference metric (0..1) allowed for recursive reconciliation, with
	// value of 0 meaning equal sets and 1 meaning completely disjoint set. If the
	// difference metric MaxReconcDiff value, the whole set is transmitted instead of
	// applying the recursive algorithm.
	MaxReconcDiff float64 `mapstructure:"max-reconc-diff"`
	// Time span for recent sync.
	RecentTimeSpan time.Duration `mapstructure:"recent-time-span"`
	// Traffic limit in bytes.
	TrafficLimit int `mapstructure:"traffic-limit"`
	// Message count limit.
	MessageLimit int `mapstructure:"message-limit"`
}

func DefaultConfig added in v1.7.7

func DefaultConfig() RangeSetReconcilerConfig

DefaultConfig returns the default configuration for the RangeSetReconciler.

func (*RangeSetReconcilerConfig) Validate added in v1.7.9

func (cfg *RangeSetReconcilerConfig) Validate(logger *zap.Logger) bool

type RecentMessage

type RecentMessage struct {
	SinceTime uint64
}

RecentMessage is a SyncMessage that denotes a set of items that have been added to the peer's set since the specific point in time.

func (*RecentMessage) Count

func (m *RecentMessage) Count() int

func (*RecentMessage) DecodeScale

func (t *RecentMessage) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*RecentMessage) EncodeScale

func (t *RecentMessage) EncodeScale(enc *scale.Encoder) (total int, err error)

func (*RecentMessage) Fingerprint

func (m *RecentMessage) Fingerprint() Fingerprint

func (*RecentMessage) Keys

func (m *RecentMessage) Keys() []KeyBytes

func (*RecentMessage) Sample

func (m *RecentMessage) Sample() []MinhashSampleItem

func (*RecentMessage) Since

func (m *RecentMessage) Since() time.Time

func (*RecentMessage) Type

func (m *RecentMessage) Type() MessageType

func (*RecentMessage) X

func (m *RecentMessage) X() KeyBytes

func (*RecentMessage) Y

func (m *RecentMessage) Y() KeyBytes

type Requester

type Requester interface {
	Run(context.Context) error
	StreamRequest(context.Context, p2p.Peer, []byte, server.StreamRequestCallback, ...string) error
}

type SampleMessage

type SampleMessage struct {
	RangeX, RangeY   CompactHash
	RangeFingerprint Fingerprint
	NumItems         uint32
	// NOTE: max must be in sync with maxSampleSize in hashsync/rangesync.go
	SampleItems []MinhashSampleItem `scale:"max=1000"`
}

SampleMessage is a sample of set items.

func (*SampleMessage) Count

func (m *SampleMessage) Count() int

func (*SampleMessage) DecodeScale

func (t *SampleMessage) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*SampleMessage) EncodeScale

func (t *SampleMessage) EncodeScale(enc *scale.Encoder) (total int, err error)

func (*SampleMessage) Fingerprint

func (m *SampleMessage) Fingerprint() Fingerprint

func (*SampleMessage) Keys

func (m *SampleMessage) Keys() []KeyBytes

func (*SampleMessage) Sample

func (m *SampleMessage) Sample() []MinhashSampleItem

func (*SampleMessage) Since

func (m *SampleMessage) Since() time.Time

func (*SampleMessage) Type

func (m *SampleMessage) Type() MessageType

func (*SampleMessage) X

func (m *SampleMessage) X() KeyBytes

func (*SampleMessage) Y

func (m *SampleMessage) Y() KeyBytes

type Seq

type Seq iter.Seq[KeyBytes]

Seq represents an ordered sequence of elements. Most sequences are finite. Infinite sequences are explicitly mentioned in the documentation of functions/methods that return them.

func EmptySeq

func EmptySeq() Seq

EmptySeq returns an empty sequence.

func (Seq) Collect

func (s Seq) Collect() []KeyBytes

Collect returns all elements in the sequence as a slice. It may not be very efficient due to reallocations, and thus it should only be used for small sequences or for testing.

func (Seq) First

func (s Seq) First() KeyBytes

First returns the first element from the sequence, if any. If the sequence is empty, it returns nil.

func (Seq) FirstN

func (s Seq) FirstN(n int) []KeyBytes

FirstN returns the first n elements from the sequence.

func (Seq) Limit added in v1.7.9

func (s Seq) Limit(n int) Seq

Limit limits sequence to n elements.

func (Seq) MarshalLogArray

func (s Seq) MarshalLogArray(enc zapcore.ArrayEncoder) error

MarshalLogArray implements zapcore.ArrayMarshaler.

type SeqErrorFunc

type SeqErrorFunc func() error

SeqErrorFunc is a function that returns an error that happened during iteration, if any.

var NoSeqError SeqErrorFunc = func() error { return nil }

NoSeqError is a SeqErrorFunc that always returns nil (no error).

func SeqError

func SeqError(err error) SeqErrorFunc

SeqError returns a SeqErrorFunc that always returns the given error.

type SeqResult

type SeqResult struct {
	Seq   Seq
	Error SeqErrorFunc
}

SeqResult represents the result of a function that returns a sequence. Error method most be called to check if an error occurred after processing the sequence. Error is reset at the beginning of each Seq call (iteration over the sequence).

func CombineSeqs added in v1.7.7

func CombineSeqs(startingPoint KeyBytes, srs ...SeqResult) SeqResult

CombineSeqs combines multiple ordered sequences from SeqResults into one, returning the smallest current key among all iterators at each step. startingPoint is used to check if an iterator has wrapped around. If an iterator yields a value below startingPoint, it is considered to have wrapped around.

func EmptySeqResult

func EmptySeqResult() SeqResult

EmptySeqResult returns an empty sequence result.

func ErrorSeqResult

func ErrorSeqResult(err error) SeqResult

ErrorSeqResult returns a sequence result with an empty sequence and an error.

func MakeSeqResult added in v1.7.9

func MakeSeqResult(items []KeyBytes) SeqResult

MakeSeqResult makes a SeqResult out of a slice.

func (SeqResult) Collect

func (s SeqResult) Collect() ([]KeyBytes, error)

Collect returns all elements in the result's sequence as a slice. It may not be very efficient due to reallocations, and thus it should only be used for small sequences or for testing.

func (SeqResult) First

func (s SeqResult) First() (KeyBytes, error)

First returns the first element from the result's sequence, if any. If the sequence is empty, it returns nil.

func (SeqResult) FirstN

func (s SeqResult) FirstN(n int) ([]KeyBytes, error)

FirstN returns the first n elements from the result's sequence.

func (SeqResult) IsEmpty added in v1.7.9

func (s SeqResult) IsEmpty() (bool, error)

IsEmpty returns true if the sequence in SeqResult is empty. It also checks for errors.

func (SeqResult) Limit added in v1.7.9

func (s SeqResult) Limit(n int) SeqResult

Limit limits SeqResult to n elements.

func (SeqResult) MarshalLogArray

func (s SeqResult) MarshalLogArray(enc zapcore.ArrayEncoder) error

MarshalLogArray implements zapcore.ArrayMarshaler.

type SplitInfo

type SplitInfo struct {
	// 2 parts of the range
	Parts [2]RangeInfo
	// Middle point between the ranges
	Middle KeyBytes
}

SplitInfo contains information about range split in two.

type SyncMessage

type SyncMessage interface {
	// Type returns the type of the message.
	Type() MessageType
	// X returns the beginning of the range.
	X() KeyBytes
	// Y returns the end of the range.
	Y() KeyBytes
	// Fingerprint returns the fingerprint of the range.
	Fingerprint() Fingerprint
	// Count returns the number of items in the range.
	Count() int
	// Keys returns the keys of the items in the range.
	Keys() []KeyBytes
	// Since returns the time since when the recent items are being sent.
	Since() time.Time
	// Sample returns the minhash sample of the items in the range.
	Sample() []MinhashSampleItem
}

SyncMessage is a message that is a part of the sync protocol.

type Tracer

type Tracer interface {
	// OnDumbSync is called when the difference metric exceeds maxDiff and dumb
	// reconciliation process is used
	OnDumbSync()
	// OnRecent is invoked when Recent message is received
	OnRecent(receivedItems, sentItems int)
}

Tracer tracks the reconciliation process.

Directories

Path Synopsis
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL