Documentation
¶
Overview ¶
Package rangesync implements pairwise set reconciliation protocol.
The protocol is based on this paper: Range-Based Set Reconciliation by Aljoscha Meyer https://arxiv.org/pdf/2212.13567.pdf
The protocol has an advantage of a possibility to build a reusable synchronization helper structures (ordered sets) and also a possibility to use the same structure to reconcile subranges of the ordered sets efficiently against multiple peers. The disadvantage is that the algorithm is not very efficient for large differences (like tens of thousands of elements) when the elements have no apparent ordering, which is the case with hash-based IDs like ATX ids. In order to mitigate this problem, the protocol supports exchanging recently-received items between peers before beginning the actual reconciliation. When the difference is large, the algorithm can automatically degrade to dumb "send me the whole set" mode. The resulting increased load on the network and the peer computers is mitigated by splitting the element range between multiple peers.
The core concepts are the following:
- Ordered sets. The sets to be reconciled are ordered. The elements in the set are also called "keys" b/c they represent the IDs of the actual objects being synchronized, such as ATXs.
- Ranges. [x, y) range denotes a range of items in the set, where x is inclusive and y is exclusive. The ranges may wrap around. This means: - x == y: the whole set - x < y: a normal range starting with x and ending below y - x > y: a wrapped around range, that is from x (inclusive) to the end of the set and from the beginning of the set to y, non-inclusive.
- Fingerprint. Each range has a fingerprint, which is all the IDs (keys) in the range XORed together. The fingerprint is used to quickly check if the range is in sync with the peer.
Each OrderedSet is supposed to be able to provide the number of items and the fingerprint of items in a range relatively cheaply. Additionally, the OrderedSet has Recent method that retrieves recently-based keys (elements) based on the provided timestamp. This reconciliation helper mechanism is optional, so Recent may just return an empty sequence.
Below is a log of a sample interaction between two peers A and B. Interactions:
A: empty set; B: empty set A -> B:
EmptySet EndRound
B -> A:
Done
A: empty set; B: non-empty set A -> B:
EmptySet EndRound
B -> A:
ItemBatch ItemBatch ...
A -> B:
Done
A: small set (< maxSendRange); B: non-empty set A -> B:
ItemBatch ItemBatch ... RangeContents [x, y) EndRound
B -> A:
ItemBatch ItemBatch ...
A -> B:
Done
A: large set; B: non-empty set; maxDiff < 0 A -> B:
Fingerprint [x, y) EndRound
B -> A:
Fingerprint [x, m) Fingerprint [m, y) EndRound
A -> B:
ItemBatch ItemBatch ... RangeContents [x, m) EndRound
A -> B:
Done
A: large set; B: non-empty set; maxDiff >= 0; differenceMetric <= maxDiff NOTE: Sample includes fingerprint A -> B:
Sample [x, y) EndRound
B -> A:
Fingerprint [x, m) Fingerprint [m, y) EndRound
A -> B:
ItemBatch ItemBatch ... RangeContents [x, m) EndRound
A -> B:
Done
A: large set; B: non-empty set; maxDiff >= 0; differenceMetric > maxDiff A -> B:
Sample [x, y) EndRound
B -> A:
ItemBatch ItemBatch ... RangeContents [x, y) EndRound
A -> B:
Done
A: large set; B: non-empty set; sync priming; maxDiff >= 0; differenceMetric <= maxDiff (after priming) A -> B:
ItemBatch ItemBatch ... Recent EndRound
B -> A:
ItemBatch ItemBatch ... Sample [x, y) EndRound
A -> B:
Fingerprint [x, m) Fingerprint [m, y) EndRound
B -> A:
ItemBatch ItemBatch ... RangeContents [x, m) EndRound
A -> B:
Done
A: large set; B: non-empty set; sync priming; maxDiff < 0 A -> B:
ItemBatch ItemBatch ... Recent EndRound
B -> A:
ItemBatch ItemBatch ... Fingerprint [x, y) EndRound
A -> B:
Fingerprint [x, m) Fingerprint [m, y) EndRound
B -> A:
ItemBatch ItemBatch ... RangeContents [x, m) EndRound
A -> B:
Done
nolint
Index ¶
- Constants
- Variables
- func CalcSim(a, b []MinhashSampleItem) float64
- func SyncMessageToString(m SyncMessage) string
- type CompactHash
- type Conduit
- type Dispatcher
- type DoneMessage
- type DumbSet
- func (ds *DumbSet) Add(id KeyBytes) error
- func (ds *DumbSet) AddReceived()
- func (ds *DumbSet) AddUnchecked(id KeyBytes)
- func (ds *DumbSet) Advance() error
- func (ds *DumbSet) EnsureLoaded() error
- func (ds *DumbSet) Has(k KeyBytes) (bool, error)
- func (ds *DumbSet) Loaded() bool
- func (ds *DumbSet) RangeInfo(x, y KeyBytes) (RangeInfo, error)
- func (ds *DumbSet) Receive(id KeyBytes) error
- func (ds *DumbSet) Received() SeqResult
- func (ds *DumbSet) Recent(since time.Time) (SeqResult, int)
- func (ds *DumbSet) Release()
- func (ds *DumbSet) SetAllowMultiReceive(allow bool)
- func (ds *DumbSet) SetInfo() (RangeInfo, error)
- func (ds *DumbSet) SplitRange(x, y KeyBytes, count int) (SplitInfo, error)
- func (ds *DumbSet) WithCopy(toCall func(OrderedSet) error) error
- type EmptyRangeMessage
- func (m *EmptyRangeMessage) Count() int
- func (t *EmptyRangeMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
- func (t *EmptyRangeMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
- func (m *EmptyRangeMessage) Fingerprint() Fingerprint
- func (m *EmptyRangeMessage) Keys() []KeyBytes
- func (m *EmptyRangeMessage) Sample() []MinhashSampleItem
- func (m *EmptyRangeMessage) Since() time.Time
- func (m *EmptyRangeMessage) Type() MessageType
- func (m *EmptyRangeMessage) X() KeyBytes
- func (m *EmptyRangeMessage) Y() KeyBytes
- type EmptySetMessage
- type EndRoundMessage
- type Fingerprint
- type FingerprintMessage
- func (m *FingerprintMessage) Count() int
- func (t *FingerprintMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
- func (t *FingerprintMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
- func (m *FingerprintMessage) Fingerprint() Fingerprint
- func (m *FingerprintMessage) Keys() []KeyBytes
- func (m *FingerprintMessage) Sample() []MinhashSampleItem
- func (m *FingerprintMessage) Since() time.Time
- func (m *FingerprintMessage) Type() MessageType
- func (m *FingerprintMessage) X() KeyBytes
- func (m *FingerprintMessage) Y() KeyBytes
- type Handler
- type ItemBatchMessage
- func (m *ItemBatchMessage) Count() int
- func (t *ItemBatchMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
- func (t *ItemBatchMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
- func (m *ItemBatchMessage) Fingerprint() Fingerprint
- func (m *ItemBatchMessage) Keys() []KeyBytes
- func (m *ItemBatchMessage) Sample() []MinhashSampleItem
- func (m *ItemBatchMessage) Since() time.Time
- func (m *ItemBatchMessage) Type() MessageType
- func (m *ItemBatchMessage) X() KeyBytes
- func (m *ItemBatchMessage) Y() KeyBytes
- type KeyBytes
- func (k KeyBytes) BitFromLeft(i int) bool
- func (k KeyBytes) Clone() KeyBytes
- func (k KeyBytes) Compare(other KeyBytes) int
- func (k KeyBytes) Inc() (overflow bool)
- func (k KeyBytes) IsZero() bool
- func (k KeyBytes) ShortString() string
- func (k KeyBytes) String() string
- func (k KeyBytes) Trim(bit int)
- func (k KeyBytes) Zero()
- type KeyCollection
- type Marker
- type MessageType
- type MinhashSampleItem
- type OrderedSet
- type PairwiseSetSyncer
- func (pss *PairwiseSetSyncer) Probe(ctx context.Context, peer p2p.Peer, os OrderedSet, x, y KeyBytes) (pr ProbeResult, err error)
- func (pss *PairwiseSetSyncer) Received() int
- func (pss *PairwiseSetSyncer) Register(d *Dispatcher, os OrderedSet)
- func (pss *PairwiseSetSyncer) Sent() int
- func (pss *PairwiseSetSyncer) Serve(ctx context.Context, stream io.ReadWriter, os OrderedSet) error
- func (pss *PairwiseSetSyncer) Sync(ctx context.Context, peer p2p.Peer, os OrderedSet, x, y KeyBytes) error
- type ProbeMessage
- func (m *ProbeMessage) Count() int
- func (t *ProbeMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
- func (t *ProbeMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
- func (m *ProbeMessage) Fingerprint() Fingerprint
- func (m *ProbeMessage) Keys() []KeyBytes
- func (m *ProbeMessage) Sample() []MinhashSampleItem
- func (m *ProbeMessage) Since() time.Time
- func (m *ProbeMessage) Type() MessageType
- func (m *ProbeMessage) X() KeyBytes
- func (m *ProbeMessage) Y() KeyBytes
- type ProbeResult
- type RangeContentsMessage
- func (m *RangeContentsMessage) Count() int
- func (t *RangeContentsMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
- func (t *RangeContentsMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
- func (m *RangeContentsMessage) Fingerprint() Fingerprint
- func (m *RangeContentsMessage) Keys() []KeyBytes
- func (m *RangeContentsMessage) Sample() []MinhashSampleItem
- func (m *RangeContentsMessage) Since() time.Time
- func (m *RangeContentsMessage) Type() MessageType
- func (m *RangeContentsMessage) X() KeyBytes
- func (m *RangeContentsMessage) Y() KeyBytes
- type RangeInfo
- type RangeSetReconciler
- func (rsr *RangeSetReconciler) HandleProbeResponse(c Conduit, info RangeInfo) (pr ProbeResult, err error)
- func (rsr *RangeSetReconciler) Initiate(c Conduit, x, y KeyBytes) error
- func (rsr *RangeSetReconciler) InitiateProbe(c Conduit, x, y KeyBytes) (RangeInfo, error)
- func (rsr *RangeSetReconciler) Run(c Conduit) error
- type RangeSetReconcilerConfig
- type RecentMessage
- func (m *RecentMessage) Count() int
- func (t *RecentMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
- func (t *RecentMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
- func (m *RecentMessage) Fingerprint() Fingerprint
- func (m *RecentMessage) Keys() []KeyBytes
- func (m *RecentMessage) Sample() []MinhashSampleItem
- func (m *RecentMessage) Since() time.Time
- func (m *RecentMessage) Type() MessageType
- func (m *RecentMessage) X() KeyBytes
- func (m *RecentMessage) Y() KeyBytes
- type Requester
- type SampleMessage
- func (m *SampleMessage) Count() int
- func (t *SampleMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
- func (t *SampleMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
- func (m *SampleMessage) Fingerprint() Fingerprint
- func (m *SampleMessage) Keys() []KeyBytes
- func (m *SampleMessage) Sample() []MinhashSampleItem
- func (m *SampleMessage) Since() time.Time
- func (m *SampleMessage) Type() MessageType
- func (m *SampleMessage) X() KeyBytes
- func (m *SampleMessage) Y() KeyBytes
- type Seq
- type SeqErrorFunc
- type SeqResult
- type SplitInfo
- type SyncMessage
- type Tracer
Constants ¶
const ( DefaultMaxSendRange = 1 DefaultItemChunkSize = 1024 DefaultSampleSize = 200 )
const (
// FingerprintSize is the size of a fingerprint in bytes.
FingerprintSize = 12
)
Variables ¶
var ( ErrTrafficLimitExceeded = errors.New("sync traffic limit exceeded") ErrMessageLimitExceeded = errors.New("sync message limit exceeded") )
Functions ¶
func CalcSim ¶
func CalcSim(a, b []MinhashSampleItem) float64
CalcSim estimates the Jaccard similarity coefficient between two sets based on the samples, which are derived from N lowest-valued elements of each set. The return value is in 0..1 range, with 0 meaning almost no intersection and 1 meaning the sets are mostly equal. The precision of the estimate will suffer if none of the sets are empty and they have different size, with return value tending to be lower than the actual J coefficient.
func SyncMessageToString ¶
func SyncMessageToString(m SyncMessage) string
SyncMessageToString returns string representation of a sync message.
Types ¶
type CompactHash ¶
type CompactHash struct {
H KeyBytes
}
CompactHash encodes hashes in a compact form, skipping trailing zeroes. It also supports a nil hash (no value). The encoding format is as follows: byte 0: spec byte bytes 1..n: data bytes
The format of the spec byte is as follows: bits 0..5: number of non-zero leading bytes bits 6..7: hash type
The following hash types are supported: 0: nil hash 1: 32-byte hash 2,3: reserved
NOTE: when adding new hash types, we need to add a mechanism that makes sure that every received hash is of the expected type. Alternatively, we need to add some kind of context to the scale.Decoder / scale.Encoder, which may contain the size of hashes to be used.
func (*CompactHash) DecodeScale ¶
func (c *CompactHash) DecodeScale(dec *scale.Decoder) (int, error)
DecodeScale implements scale.Decodable.
func (*CompactHash) EncodeScale ¶
func (c *CompactHash) EncodeScale(enc *scale.Encoder) (int, error)
EncodeScale implements scale.Encodable.
func (*CompactHash) ToOrdered ¶
func (c *CompactHash) ToOrdered() KeyBytes
type Conduit ¶
type Conduit interface {
// NextMessage returns the next SyncMessage, or nil if there are no more
// SyncMessages for this session.
NextMessage() (SyncMessage, error)
// Send sends a SyncMessage to the peer.
Send(SyncMessage) error
}
Conduit handles receiving and sending peer messages.
type Dispatcher ¶
Dispatcher multiplexes a P2P Server to multiple set reconcilers.
func NewDispatcher ¶
func NewDispatcher(logger *zap.Logger) *Dispatcher
NewDispatcher creates a new Dispatcher.
func (*Dispatcher) Dispatch ¶
func (d *Dispatcher) Dispatch( ctx context.Context, peer p2p.Peer, req []byte, stream io.ReadWriter, ) (err error)
Dispatch dispatches a request to a handler.
func (*Dispatcher) Register ¶
func (d *Dispatcher) Register(name string, h Handler)
Register registers a handler with a Dispatcher.
func (*Dispatcher) SetupServer ¶
func (d *Dispatcher) SetupServer( host host.Host, proto string, opts ...server.Opt, ) *server.Server
SetupServer creates a new P2P Server for the Dispatcher.
type DoneMessage ¶
type DoneMessage struct{ Marker }
DoneMessage is a SyncMessage that denotes the end of the synchronization. The peer should stop any further processing after receiving this message.
func (*DoneMessage) Type ¶
func (*DoneMessage) Type() MessageType
type DumbSet ¶ added in v1.7.7
type DumbSet struct {
FPFunc func(items []KeyBytes) Fingerprint
// contains filtered or unexported fields
}
DumbSet is a simple OrderedSet implementation that doesn't include any optimizations. It is intended to be only used in tests.
func (*DumbSet) AddReceived ¶ added in v1.7.7
func (ds *DumbSet) AddReceived()
AddReceived adds all the received items to the set.
func (*DumbSet) AddUnchecked ¶ added in v1.7.7
AddUnchecked adds an item to the set without registerting the item for checks as in case of Add and Receive.
func (*DumbSet) EnsureLoaded ¶ added in v1.7.7
EnsureLoaded implements OrderedSet.
func (*DumbSet) Release ¶ added in v1.7.7
func (ds *DumbSet) Release()
Release implements OrderedSet.
func (*DumbSet) SetAllowMultiReceive ¶ added in v1.7.7
SetAllowMultiReceive sets whether the set allows receiving the same item multiple times.
func (*DumbSet) SplitRange ¶ added in v1.7.7
SplitRange implements OrderedSet.
type EmptyRangeMessage ¶
type EmptyRangeMessage struct {
RangeX, RangeY CompactHash
}
EmptyRangeMessage notifies the peer that it needs to send all of its items in the specified range.
func (*EmptyRangeMessage) Count ¶
func (m *EmptyRangeMessage) Count() int
func (*EmptyRangeMessage) DecodeScale ¶
func (t *EmptyRangeMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*EmptyRangeMessage) EncodeScale ¶
func (t *EmptyRangeMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
func (*EmptyRangeMessage) Fingerprint ¶
func (m *EmptyRangeMessage) Fingerprint() Fingerprint
func (*EmptyRangeMessage) Keys ¶
func (m *EmptyRangeMessage) Keys() []KeyBytes
func (*EmptyRangeMessage) Sample ¶
func (m *EmptyRangeMessage) Sample() []MinhashSampleItem
func (*EmptyRangeMessage) Since ¶
func (m *EmptyRangeMessage) Since() time.Time
func (*EmptyRangeMessage) Type ¶
func (m *EmptyRangeMessage) Type() MessageType
func (*EmptyRangeMessage) X ¶
func (m *EmptyRangeMessage) X() KeyBytes
func (*EmptyRangeMessage) Y ¶
func (m *EmptyRangeMessage) Y() KeyBytes
type EmptySetMessage ¶
type EmptySetMessage struct{ Marker }
EmptySetMessage is a SyncMessage that denotes an empty set, requesting the peer to send all of its items.
func (*EmptySetMessage) Type ¶
func (*EmptySetMessage) Type() MessageType
type EndRoundMessage ¶
type EndRoundMessage struct{ Marker }
EndRoundMessage is a SyncMessage that denotes the end of the sync round.
func (*EndRoundMessage) Type ¶
func (*EndRoundMessage) Type() MessageType
type Fingerprint ¶
type Fingerprint [FingerprintSize]byte
Fingerprint represents a fingerprint of a set of keys. The fingerprint is obtained by XORing together the keys in the set.
func CombineFingerprints ¶ added in v1.7.7
func CombineFingerprints(a, b Fingerprint) Fingerprint
CombineFingerprints combines two fingerprints into one.
func EmptyFingerprint ¶
func EmptyFingerprint() Fingerprint
EmptyFingerprint returns an empty fingerprint.
func MustParseHexFingerprint ¶
func MustParseHexFingerprint(s string) Fingerprint
MustParseHexFingerprint converts a hex string to Fingerprint.
func RandomFingerprint ¶
func RandomFingerprint() Fingerprint
RandomFingerprint generates a random fingerprint.
func (*Fingerprint) BitFromLeft ¶
func (fp *Fingerprint) BitFromLeft(i int) bool
BitFromLeft returns the n-th bit from the left in the fingerprint.
func (Fingerprint) Compare ¶
func (fp Fingerprint) Compare(other Fingerprint) int
Compare compares two fingerprints.
func (Fingerprint) ShortString ¶
func (fp Fingerprint) ShortString() string
String implements log.ShortString.
func (*Fingerprint) Update ¶
func (fp *Fingerprint) Update(h []byte)
Update includes the byte slice in the fingerprint.
type FingerprintMessage ¶
type FingerprintMessage struct {
RangeX, RangeY CompactHash
RangeFingerprint Fingerprint
NumItems uint32
}
FingerprintMessage contains range fingerprint for comparison against the peer's fingerprint of the range with the same bounds [RangeX, RangeY).
func (*FingerprintMessage) Count ¶
func (m *FingerprintMessage) Count() int
func (*FingerprintMessage) DecodeScale ¶
func (t *FingerprintMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*FingerprintMessage) EncodeScale ¶
func (t *FingerprintMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
func (*FingerprintMessage) Fingerprint ¶
func (m *FingerprintMessage) Fingerprint() Fingerprint
func (*FingerprintMessage) Keys ¶
func (m *FingerprintMessage) Keys() []KeyBytes
func (*FingerprintMessage) Sample ¶
func (m *FingerprintMessage) Sample() []MinhashSampleItem
func (*FingerprintMessage) Since ¶
func (m *FingerprintMessage) Since() time.Time
func (*FingerprintMessage) Type ¶
func (m *FingerprintMessage) Type() MessageType
func (*FingerprintMessage) X ¶
func (m *FingerprintMessage) X() KeyBytes
func (*FingerprintMessage) Y ¶
func (m *FingerprintMessage) Y() KeyBytes
type ItemBatchMessage ¶
type ItemBatchMessage struct {
ContentKeys KeyCollection `scale:"max=1024"`
}
ItemBatchMessage denotes a batch of items to be added to the peer's set.
func (*ItemBatchMessage) Count ¶
func (m *ItemBatchMessage) Count() int
func (*ItemBatchMessage) DecodeScale ¶
func (t *ItemBatchMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*ItemBatchMessage) EncodeScale ¶
func (t *ItemBatchMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
func (*ItemBatchMessage) Fingerprint ¶
func (m *ItemBatchMessage) Fingerprint() Fingerprint
func (*ItemBatchMessage) Keys ¶
func (m *ItemBatchMessage) Keys() []KeyBytes
func (*ItemBatchMessage) Sample ¶
func (m *ItemBatchMessage) Sample() []MinhashSampleItem
func (*ItemBatchMessage) Since ¶
func (m *ItemBatchMessage) Since() time.Time
func (*ItemBatchMessage) Type ¶
func (m *ItemBatchMessage) Type() MessageType
func (*ItemBatchMessage) X ¶
func (m *ItemBatchMessage) X() KeyBytes
func (*ItemBatchMessage) Y ¶
func (m *ItemBatchMessage) Y() KeyBytes
type KeyBytes ¶
type KeyBytes []byte
KeyBytes represents an item (key) in a reconciliable set.
func MustParseHexKeyBytes ¶
MustParseHexKeyBytes converts a hex string to KeyBytes.
func RandomKeyBytes ¶
RandomKeyBytes generates random data in bytes for testing.
func (KeyBytes) BitFromLeft ¶ added in v1.7.7
BitFromLeft returns the n-th bit from the left in the key.
func (KeyBytes) Inc ¶
Inc returns the key with the same number of bytes as this one, obtained by incrementing the key by one. It returns true if the increment has caused an overflow.
func (KeyBytes) ShortString ¶
String implements log.ShortString.
type KeyCollection ¶
type KeyCollection struct {
Keys []KeyBytes
}
KeyCollection represents a collection of keys of the same size.
func (*KeyCollection) DecodeScale ¶
func (c *KeyCollection) DecodeScale(dec *scale.Decoder) (int, error)
DecodeScale implements scale.Decodable.
func (*KeyCollection) EncodeScale ¶
func (c *KeyCollection) EncodeScale(enc *scale.Encoder) (int, error)
EncodeScale implements scale.Encodable.
type Marker ¶
type Marker struct{}
func (*Marker) Fingerprint ¶
func (*Marker) Fingerprint() Fingerprint
func (*Marker) Sample ¶
func (*Marker) Sample() []MinhashSampleItem
type MessageType ¶
type MessageType byte
MessageType specifies the type of a sync message.
const ( // Done message is sent to indicate the completion of the whole sync run. MessageTypeDone MessageType = iota // EndRoundMessage is sent to indicate the completion of a single round. MessageTypeEndRound // EmptySetMessage is sent to indicate that the set is empty. In response, the // receiving side sends its whole set using ItemBatch and RangeContents messages. MessageTypeEmptySet // EmptyRangeMessage is sent to indicate that the specified range is empty. In // response, the receiving side sends the contents of the range using ItemBatch // and RangeContents messages. MessageTypeEmptyRange // Fingerprint carries a range fingerprint. Depending on the local fingerprint of // the same range, the receiving side may not reply to this message (range // completed), may send the contents of the range using ItemBatch and // RangeContents messages if the range is small enough, or it can split the range // in two and send back the Fingerprint messages for each of resulting parts. MessageTypeFingerprint // RangeContents message is sent after ItemBatch messages to indicate what range // the messages belong too. If the receiving side has any items within the same // range, these items are sent back using ItemBatch and RangeContents messages. MessageTypeRangeContents // ItemBatchMessage is sent to carry a batch of items. MessageTypeItemBatch // ProbeMessage is sent to request the count of items and an estimate of the // Jaccard similarity coefficient for the specfied range. MessageTypeProbe // Sample message carries a minhash sample along with the fingerprint and // the fingerprint and number of items within the specied range. MessageTypeSample // Recent message is sent after ItemBatch messages to indicate that the batches // contains items recently added to the set. MessageTypeRecent )
type MinhashSampleItem ¶
type MinhashSampleItem uint32
MinhashSampleItem represents an item of minhash sample subset.
func MinhashSampleItemFromKeyBytes ¶
func MinhashSampleItemFromKeyBytes(h KeyBytes) MinhashSampleItem
MinhashSampleItemFromKeyBytes uses lower 32 bits of a hash as a MinhashSampleItem.
func Sample ¶
func Sample(sr SeqResult, count, sampleSize int) ([]MinhashSampleItem, error)
Sample retrieves min(count, sampleSize) items friom the ordered sequence, extracting MinhashSampleItem from each value.
func (MinhashSampleItem) Compare ¶
func (m MinhashSampleItem) Compare(other MinhashSampleItem) int
func (*MinhashSampleItem) DecodeScale ¶
func (m *MinhashSampleItem) DecodeScale(d *scale.Decoder) (int, error)
DecodeScale implements scale.Decodable.
func (MinhashSampleItem) EncodeScale ¶
func (m MinhashSampleItem) EncodeScale(e *scale.Encoder) (int, error)
EncodeScale implements scale.Encodable.
func (MinhashSampleItem) String ¶
func (m MinhashSampleItem) String() string
type OrderedSet ¶
type OrderedSet interface {
// Add adds a new key to the set.
// It should not perform any additional actions related to handling
// the received key.
Add(k KeyBytes) error
// Receive handles a new key received from the peer.
// It should not add the key to the set.
Receive(k KeyBytes) error
// Received returns the sequence containing all the items received from the peer.
// Unlike other methods, SeqResult returned by Received called on a copy of the
// OrderedSet passed to WithCopy callback is expected to be valid outside of the
// callback as well.
Received() SeqResult
// RangeInfo returns RangeInfo for the item range in the ordered set,
// bounded by [x, y).
// x == y indicates the whole set.
// x < y indicates a normal range starting with x and ending below y.
// x > y indicates a wrapped around range, that is from x (inclusive) to then end
// of the set and from the beginning of the set to y, non-inclusive.
// If count >= 0, at most count items are returned, and RangeInfo
// is returned for the corresponding subrange of the requested range.
RangeInfo(x, y KeyBytes) (RangeInfo, error)
// SplitRange splits the range roughly after the specified count of items,
// returning RangeInfo for the first half and the second half of the range.
SplitRange(x, y KeyBytes, count int) (SplitInfo, error)
// SetInfo returns RangeInfo for the whole set.
SetInfo() (RangeInfo, error)
// WithCopy runs the specified function, passing to it a temporary shallow copy of
// the OrderedSet. The copy is discarded after the function returns, releasing
// any resources associated with it.
// The list of received items as returned by Received is inherited by the copy.
WithCopy(toCall func(OrderedSet) error) error
// Recent returns an Iterator that yields the items added since the specified
// timestamp. Some OrderedSet implementations may not have Recent implemented, in
// which case it should return an empty sequence.
Recent(since time.Time) (SeqResult, int)
// Loaded returns true if the set is loaded and ready for use.
Loaded() bool
// EnsureLoaded ensures that the set is loaded and ready for use.
// It may do nothing in case of in-memory sets, but may trigger loading
// from database in case of database-backed sets.
EnsureLoaded() error
// Advance advances the set by including the items since the set was last loaded
// or advanced.
Advance() error
// Has returns true if the specified key is present in OrderedSet.
Has(KeyBytes) (bool, error)
}
OrderedSet represents the set that can be synced against a remote peer. OrderedSet methods are non-threadsafe except for WithCopy, Loaded and EnsureLoaded. SeqResult values obtained by method calls on an OrderedSet passed to WithCopy callback are valid only within the callback and should not be used outside of it, with exception of SeqResult returned by Received, which is expected to be valid outside of the callback as well.
type PairwiseSetSyncer ¶
type PairwiseSetSyncer struct {
// contains filtered or unexported fields
}
func NewPairwiseSetSyncer ¶
func NewPairwiseSetSyncer( logger *zap.Logger, r Requester, name string, cfg RangeSetReconcilerConfig, ) *PairwiseSetSyncer
func NewPairwiseSetSyncerInternal ¶ added in v1.7.7
func NewPairwiseSetSyncerInternal( logger *zap.Logger, r Requester, name string, cfg RangeSetReconcilerConfig, tracer Tracer, clock clockwork.Clock, ) *PairwiseSetSyncer
func (*PairwiseSetSyncer) Probe ¶
func (pss *PairwiseSetSyncer) Probe( ctx context.Context, peer p2p.Peer, os OrderedSet, x, y KeyBytes, ) (pr ProbeResult, err error)
func (*PairwiseSetSyncer) Received ¶
func (pss *PairwiseSetSyncer) Received() int
func (*PairwiseSetSyncer) Register ¶
func (pss *PairwiseSetSyncer) Register(d *Dispatcher, os OrderedSet)
func (*PairwiseSetSyncer) Sent ¶
func (pss *PairwiseSetSyncer) Sent() int
func (*PairwiseSetSyncer) Serve ¶
func (pss *PairwiseSetSyncer) Serve(ctx context.Context, stream io.ReadWriter, os OrderedSet) error
func (*PairwiseSetSyncer) Sync ¶
func (pss *PairwiseSetSyncer) Sync( ctx context.Context, peer p2p.Peer, os OrderedSet, x, y KeyBytes, ) error
type ProbeMessage ¶
type ProbeMessage struct {
RangeX, RangeY CompactHash
RangeFingerprint Fingerprint
SampleSize uint32
}
ProbeMessage requests bounded range fingerprint and count from the peer, along with a minhash sample if fingerprints differ.
func (*ProbeMessage) Count ¶
func (m *ProbeMessage) Count() int
func (*ProbeMessage) DecodeScale ¶
func (t *ProbeMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*ProbeMessage) EncodeScale ¶
func (t *ProbeMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
func (*ProbeMessage) Fingerprint ¶
func (m *ProbeMessage) Fingerprint() Fingerprint
func (*ProbeMessage) Keys ¶
func (m *ProbeMessage) Keys() []KeyBytes
func (*ProbeMessage) Sample ¶
func (m *ProbeMessage) Sample() []MinhashSampleItem
func (*ProbeMessage) Since ¶
func (m *ProbeMessage) Since() time.Time
func (*ProbeMessage) Type ¶
func (m *ProbeMessage) Type() MessageType
func (*ProbeMessage) X ¶
func (m *ProbeMessage) X() KeyBytes
func (*ProbeMessage) Y ¶
func (m *ProbeMessage) Y() KeyBytes
type ProbeResult ¶
type ProbeResult struct {
// True if the peer's range (or full set) is fully in sync with the local range
// (or full set).
// Note that Sim==1 does not guarantee that the peer is in sync b/c simhash
// algorithm is not precise.
InSync bool
// Number of items in the range.
Count int
// An estimate of Jaccard similarity coefficient between the sets.
// The range is 0..1, 0 being mostly disjoint sets and 1 being mostly equal sets.
Sim float64
}
ProbeResult contains the result of a probe.
type RangeContentsMessage ¶
type RangeContentsMessage struct {
RangeX, RangeY CompactHash
NumItems uint32
}
RangeContentsMessage denotes a range for which the set of items has been sent. The peer needs to send back any items it has in the same range bounded by [RangeX, RangeY).
func (*RangeContentsMessage) Count ¶
func (m *RangeContentsMessage) Count() int
func (*RangeContentsMessage) DecodeScale ¶
func (t *RangeContentsMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*RangeContentsMessage) EncodeScale ¶
func (t *RangeContentsMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
func (*RangeContentsMessage) Fingerprint ¶
func (m *RangeContentsMessage) Fingerprint() Fingerprint
func (*RangeContentsMessage) Keys ¶
func (m *RangeContentsMessage) Keys() []KeyBytes
func (*RangeContentsMessage) Sample ¶
func (m *RangeContentsMessage) Sample() []MinhashSampleItem
func (*RangeContentsMessage) Since ¶
func (m *RangeContentsMessage) Since() time.Time
func (*RangeContentsMessage) Type ¶
func (m *RangeContentsMessage) Type() MessageType
func (*RangeContentsMessage) X ¶
func (m *RangeContentsMessage) X() KeyBytes
func (*RangeContentsMessage) Y ¶
func (m *RangeContentsMessage) Y() KeyBytes
type RangeInfo ¶
type RangeInfo struct {
// Fingerprint of the interval
Fingerprint Fingerprint
// Number of items in the interval
Count int
// Items is the sequence of set elements in the interval.
Items SeqResult
}
RangeInfo contains information about a range of items in the OrderedSet as returned by OrderedSet.GetRangeInfo.
type RangeSetReconciler ¶
type RangeSetReconciler struct {
// contains filtered or unexported fields
}
RangeSetReconciler reconciles two sets of items using the recursive set reconciliation protocol.
func NewRangeSetReconciler ¶
func NewRangeSetReconciler(logger *zap.Logger, cfg RangeSetReconcilerConfig, os OrderedSet) *RangeSetReconciler
NewRangeSetReconciler creates a new RangeSetReconciler.
func NewRangeSetReconcilerInternal ¶ added in v1.7.7
func NewRangeSetReconcilerInternal( logger *zap.Logger, cfg RangeSetReconcilerConfig, os OrderedSet, tracer Tracer, clock clockwork.Clock, ) *RangeSetReconciler
NewRangeSetReconcilerInternal creates a new RangeSetReconciler. It is only directly called by the tests. It accepts extra tracer and clock parameters.
func (*RangeSetReconciler) HandleProbeResponse ¶
func (rsr *RangeSetReconciler) HandleProbeResponse(c Conduit, info RangeInfo) (pr ProbeResult, err error)
HandleProbeResponse processes the probe response message and returns the probe result. info is the range info returned by InitiateProbe.
func (*RangeSetReconciler) Initiate ¶
func (rsr *RangeSetReconciler) Initiate(c Conduit, x, y KeyBytes) error
Initiate initiates the reconciliation process with the peer. If x and y are non-nil, [x, y) range is reconciled. If x and y are nil, the whole range is reconciled.
func (*RangeSetReconciler) InitiateProbe ¶
func (rsr *RangeSetReconciler) InitiateProbe( c Conduit, x, y KeyBytes, ) (RangeInfo, error)
InitiateProbe initiates a probe to retrieve the item count and Jaccard similarity coefficient from the peer.
func (*RangeSetReconciler) Run ¶
func (rsr *RangeSetReconciler) Run(c Conduit) error
Run performs sync reconciliation run using specified Conduit to send and receive messages.
type RangeSetReconcilerConfig ¶ added in v1.7.7
type RangeSetReconcilerConfig struct {
// Maximum range size to send instead of further subdividing the input range.
MaxSendRange uint `mapstructure:"max-send-range"`
// Size of the item chunk to use when sending the set items.
ItemChunkSize int `mapstructure:"item-chunk-size"`
// Size of the MinHash sample to be sent to the peer.
SampleSize uint `mapstructure:"sample-size"`
// Maximum set difference metric (0..1) allowed for recursive reconciliation, with
// value of 0 meaning equal sets and 1 meaning completely disjoint set. If the
// difference metric MaxReconcDiff value, the whole set is transmitted instead of
// applying the recursive algorithm.
MaxReconcDiff float64 `mapstructure:"max-reconc-diff"`
// Time span for recent sync.
RecentTimeSpan time.Duration `mapstructure:"recent-time-span"`
// Traffic limit in bytes.
TrafficLimit int `mapstructure:"traffic-limit"`
// Message count limit.
MessageLimit int `mapstructure:"message-limit"`
}
func DefaultConfig ¶ added in v1.7.7
func DefaultConfig() RangeSetReconcilerConfig
DefaultConfig returns the default configuration for the RangeSetReconciler.
type RecentMessage ¶
type RecentMessage struct {
SinceTime uint64
}
RecentMessage is a SyncMessage that denotes a set of items that have been added to the peer's set since the specific point in time.
func (*RecentMessage) Count ¶
func (m *RecentMessage) Count() int
func (*RecentMessage) DecodeScale ¶
func (t *RecentMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*RecentMessage) EncodeScale ¶
func (t *RecentMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
func (*RecentMessage) Fingerprint ¶
func (m *RecentMessage) Fingerprint() Fingerprint
func (*RecentMessage) Keys ¶
func (m *RecentMessage) Keys() []KeyBytes
func (*RecentMessage) Sample ¶
func (m *RecentMessage) Sample() []MinhashSampleItem
func (*RecentMessage) Since ¶
func (m *RecentMessage) Since() time.Time
func (*RecentMessage) Type ¶
func (m *RecentMessage) Type() MessageType
func (*RecentMessage) X ¶
func (m *RecentMessage) X() KeyBytes
func (*RecentMessage) Y ¶
func (m *RecentMessage) Y() KeyBytes
type SampleMessage ¶
type SampleMessage struct {
RangeX, RangeY CompactHash
RangeFingerprint Fingerprint
NumItems uint32
// NOTE: max must be in sync with maxSampleSize in hashsync/rangesync.go
SampleItems []MinhashSampleItem `scale:"max=1000"`
}
SampleMessage is a sample of set items.
func (*SampleMessage) Count ¶
func (m *SampleMessage) Count() int
func (*SampleMessage) DecodeScale ¶
func (t *SampleMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*SampleMessage) EncodeScale ¶
func (t *SampleMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
func (*SampleMessage) Fingerprint ¶
func (m *SampleMessage) Fingerprint() Fingerprint
func (*SampleMessage) Keys ¶
func (m *SampleMessage) Keys() []KeyBytes
func (*SampleMessage) Sample ¶
func (m *SampleMessage) Sample() []MinhashSampleItem
func (*SampleMessage) Since ¶
func (m *SampleMessage) Since() time.Time
func (*SampleMessage) Type ¶
func (m *SampleMessage) Type() MessageType
func (*SampleMessage) X ¶
func (m *SampleMessage) X() KeyBytes
func (*SampleMessage) Y ¶
func (m *SampleMessage) Y() KeyBytes
type Seq ¶
Seq represents an ordered sequence of elements. Most sequences are finite. Infinite sequences are explicitly mentioned in the documentation of functions/methods that return them.
func (Seq) Collect ¶
Collect returns all elements in the sequence as a slice. It may not be very efficient due to reallocations, and thus it should only be used for small sequences or for testing.
func (Seq) First ¶
First returns the first element from the sequence, if any. If the sequence is empty, it returns nil.
func (Seq) MarshalLogArray ¶
func (s Seq) MarshalLogArray(enc zapcore.ArrayEncoder) error
MarshalLogArray implements zapcore.ArrayMarshaler.
type SeqErrorFunc ¶
type SeqErrorFunc func() error
SeqErrorFunc is a function that returns an error that happened during iteration, if any.
var NoSeqError SeqErrorFunc = func() error { return nil }
NoSeqError is a SeqErrorFunc that always returns nil (no error).
func SeqError ¶
func SeqError(err error) SeqErrorFunc
SeqError returns a SeqErrorFunc that always returns the given error.
type SeqResult ¶
type SeqResult struct {
Seq Seq
Error SeqErrorFunc
}
SeqResult represents the result of a function that returns a sequence. Error method most be called to check if an error occurred after processing the sequence. Error is reset at the beginning of each Seq call (iteration over the sequence).
func CombineSeqs ¶ added in v1.7.7
CombineSeqs combines multiple ordered sequences from SeqResults into one, returning the smallest current key among all iterators at each step. startingPoint is used to check if an iterator has wrapped around. If an iterator yields a value below startingPoint, it is considered to have wrapped around.
func EmptySeqResult ¶
func EmptySeqResult() SeqResult
EmptySeqResult returns an empty sequence result.
func ErrorSeqResult ¶
ErrorSeqResult returns a sequence result with an empty sequence and an error.
func MakeSeqResult ¶ added in v1.7.9
MakeSeqResult makes a SeqResult out of a slice.
func (SeqResult) Collect ¶
Collect returns all elements in the result's sequence as a slice. It may not be very efficient due to reallocations, and thus it should only be used for small sequences or for testing.
func (SeqResult) First ¶
First returns the first element from the result's sequence, if any. If the sequence is empty, it returns nil.
func (SeqResult) IsEmpty ¶ added in v1.7.9
IsEmpty returns true if the sequence in SeqResult is empty. It also checks for errors.
func (SeqResult) MarshalLogArray ¶
func (s SeqResult) MarshalLogArray(enc zapcore.ArrayEncoder) error
MarshalLogArray implements zapcore.ArrayMarshaler.
type SplitInfo ¶
type SplitInfo struct {
// 2 parts of the range
Parts [2]RangeInfo
// Middle point between the ranges
Middle KeyBytes
}
SplitInfo contains information about range split in two.
type SyncMessage ¶
type SyncMessage interface {
// Type returns the type of the message.
Type() MessageType
// X returns the beginning of the range.
X() KeyBytes
// Y returns the end of the range.
Y() KeyBytes
// Fingerprint returns the fingerprint of the range.
Fingerprint() Fingerprint
// Count returns the number of items in the range.
Count() int
// Keys returns the keys of the items in the range.
Keys() []KeyBytes
// Since returns the time since when the recent items are being sent.
Since() time.Time
// Sample returns the minhash sample of the items in the range.
Sample() []MinhashSampleItem
}
SyncMessage is a message that is a part of the sync protocol.