sync

package
v0.0.0-...-956700b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 4, 2025 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const SnapHistoryIndexFreshness = time.Millisecond * 10

Variables

View Source
var ErrSnapshot = errors.New("snapshot error")

Functions

This section is empty.

Types

type ErrSync

type ErrSync struct {
	Publisher enc.Name
	BootTime  uint64
	Err       error
}

func (*ErrSync) Error

func (e *ErrSync) Error() string

Returns a formatted error message for a sync error, including the publisher, boot time, and underlying error.

func (*ErrSync) Unwrap

func (e *ErrSync) Unwrap() error

Returns the underlying error wrapped by this ErrSync instance, enabling error unwrapping as part of Go's error handling pattern.

type SimplePs

type SimplePs[V any] struct {
	// contains filtered or unexported fields
}

SimplePs is a simple Pub/Sub system.

func NewSimplePs

func NewSimplePs[V any]() SimplePs[V]

Constructs a new SimplePs instance with an empty map for managing subscribers and producers of type V.

func (*SimplePs[V]) HasSub

func (ps *SimplePs[V]) HasSub(prefix enc.Name) bool

Returns true if there are existing subscriptions for the given prefix.

func (*SimplePs[V]) Publish

func (ps *SimplePs[V]) Publish(name enc.Name, data V)

Publishes the given data to all subscribers associated with the specified name by invoking their callback functions.

func (*SimplePs[V]) Subs

func (ps *SimplePs[V]) Subs(prefix enc.Name) iter.Seq[func(V)]

Returns an iterator over subscriber callbacks whose registered prefix matches the given name prefix.

func (*SimplePs[V]) Subscribe

func (ps *SimplePs[V]) Subscribe(prefix enc.Name, callback func(V)) error

Registers a callback function to be invoked when data matching the specified prefix is published, storing the subscription in the internal map and panicking if the callback is nil.

func (*SimplePs[V]) Unsubscribe

func (ps *SimplePs[V]) Unsubscribe(prefix enc.Name)

Removes a subscription to the specified prefix by deleting the corresponding entry from the internal subscription map using its TLV-encoded string representation.

type SimplePsSub

type SimplePsSub[V any] struct {
	// Prefix is the name prefix to subscribe.
	Prefix enc.Name
	// Callback is the callback function.
	Callback func(V)
}

type Snapshot

type Snapshot interface {
	// Snapshot returns the Snapshot trait.
	Snapshot() Snapshot
	// contains filtered or unexported methods
}

type SnapshotNodeHistory

type SnapshotNodeHistory struct {
	// Client is the object client.
	Client ndn.Client
	// Threshold is the number of updates before a snapshot is taken.
	Threshold uint64

	// Compress is the optional callback to compress the history snapshot.
	//
	// 1. The snapshot should be compressed in place.
	// 2. If grouping is used, the last sequence number of a group should be kept.
	//    Earlier sequence numbers can then be removed.
	// 3. #2 implies that the last sequence number cannot be removed.
	//
	// For example, for snapshot 1, [2, 3, 4], (5), 6, (7), 8, 9, 10:
	//  - [VALID] 1, [234], 6, (57), 8, 10
	//  - [INVALID] 1, 4[234], 6, 7(57), 8, 9, 10
	//  - [INVALID] 1, 3[234], 6, 7(57), 8, 9, 10
	//  - [INVALID] 1, 4[234], 5(57), 6, 8, 9, 10
	//  - [INVALID] 1, 4[234], 6, 8, 9, 10
	Compress func(*svs_ps.HistorySnap)

	// In Repo mode, all snapshots are fetched automtically for persistence.
	IsRepo bool
	// IgnoreValidity ignores validity period in the validation chain
	IgnoreValidity optional.Optional[bool]
	// contains filtered or unexported fields
}

SnapshotNodeLatest is a snapshot strategy that assumes that it is not possible to take a snapshot of the application state. Instead, it creates a snapshot of the entire publication history.

This strategy should be used with highly persistent storage, as it will store all publications since the node bootstrapped, and fetch publications from a node's previous instances (bootstraps). To ensure that publications from previous instances are available, the application must use NDN Repo.

func (*SnapshotNodeHistory) Snapshot

func (s *SnapshotNodeHistory) Snapshot() Snapshot

Returns the current SnapshotNodeHistory instance as a Snapshot interface.

func (*SnapshotNodeHistory) String

func (s *SnapshotNodeHistory) String() string

Returns a string representation of the SnapshotNodeHistory type, which is "snapshot-node-history".

type SnapshotNodeLatest

type SnapshotNodeLatest struct {
	// Client is the object client.
	Client ndn.Client

	// SnapMe is the callback to get a snapshot of the application state.
	//
	// The state should encode the entire state of the node, and should replace
	// any previous publications completely. If this snapshot is delivered to a
	// node, previous publications will be ignored by the receiving node.
	//
	// The callback is passed the name of the snapshot that will be created.
	SnapMe func(enc.Name) (enc.Wire, error)
	// Threshold is the number of updates before a snapshot is taken.
	Threshold uint64
	// IgnoreValidity ignores validity period in the validation chain
	IgnoreValidity optional.Optional[bool]
	// contains filtered or unexported fields
}

SnapshotNodeLatest is a snapshot strategy that takes a snapshot of the application state whenever a certain number of updates have been made.

Each snapshot is treated as self-contained and replaces any previous publications completely. Only the latest (hence the name) snapshot is fetched by other nodes, and previous publications are ignored.

When a node bootstraps again, this strategy assumes that the previous state is now invalid and fetches the latest snapshot.

func (*SnapshotNodeLatest) Snapshot

func (s *SnapshotNodeLatest) Snapshot() Snapshot

Returns the current SnapshotNodeLatest instance as a Snapshot interface.

func (*SnapshotNodeLatest) String

func (s *SnapshotNodeLatest) String() string

Returns a string representation of the SnapshotNodeLatest instance, identifying it as "snapshot-node-latest".

type SnapshotNull

type SnapshotNull struct {
}

SnapshotNull is a non-snapshot strategy.

func (*SnapshotNull) Snapshot

func (s *SnapshotNull) Snapshot() Snapshot

Returns the current snapshot instance without modification, indicating this object is already a snapshot.

type SvMap

type SvMap[V any] map[string][]SvMapVal[V]

Map representation of the state vector.

func NewSvMap

func NewSvMap[V any](size int) SvMap[V]

Create a new state vector map.

func (SvMap[V]) Clear

func (m SvMap[V]) Clear()

Clears all entries from the SvMap if it is not nil, preventing panic when the map is nil.

func (SvMap[V]) Encode

func (m SvMap[V]) Encode(seq func(V) uint64) *spec_svs.StateVector

Encode the state vector map to a StateVector. seq is the function to get the sequence number

func (SvMap[V]) Get

func (m SvMap[V]) Get(hash string, boot uint64) (value V)

Get seq entry for a bootstrap time.

func (SvMap[V]) IsNewerThan

func (m SvMap[V]) IsNewerThan(other SvMap[V], cmp func(V, V) bool) bool

Check if a SvMap is newer than another. cmp(a, b) is the function to compare values (a > b).

func (SvMap[V]) Iter

func (m SvMap[V]) Iter() iter.Seq2[enc.Name, []SvMapVal[V]]

Returns an iterator that converts TLV-encoded name strings in the SvMap to enc.Name objects and yields each key-value pair, skipping and logging invalid names.

func (SvMap[V]) Set

func (m SvMap[V]) Set(hash string, boot uint64, value V)

Sets the value for the specified hash and boot time in the SvMap, maintaining sorted order of entries by boot time through binary search and insertion.

type SvMapVal

type SvMapVal[V any] struct {
	Boot  uint64
	Value V
}

One entry in the state vector map.

func (*SvMapVal[V]) Cmp

func (*SvMapVal[V]) Cmp(a, b SvMapVal[V]) int

Compares two SvMapVal values by their Boot field, returning -1, 0, or 1 if a's Boot is less than, equal to, or greater than b's Boot.

type SvSync

type SvSync struct {
	// contains filtered or unexported fields
}

func NewSvSync

func NewSvSync(opts SvSyncOpts) *SvSync

NewSvSync creates a new SV Sync instance.

func (*SvSync) GetBootTime

func (s *SvSync) GetBootTime() uint64

Returns the boot time of the SvSync instance as a 64-bit unsigned integer.

func (*SvSync) GetNames

func (s *SvSync) GetNames() []enc.Name

Returns a slice containing all names currently stored in the synchronization state, ensuring thread-safety through mutual exclusion during access.

func (*SvSync) GetSeqNo

func (s *SvSync) GetSeqNo(name enc.Name) uint64

GetSeqNo returns the sequence number for a name.

func (*SvSync) IncrSeqNo

func (s *SvSync) IncrSeqNo(name enc.Name) uint64

IncrSeqNo increments the sequence number for a name. The instance must only increment sequence numbers for names it owns.

func (*SvSync) SetSeqNo

func (s *SvSync) SetSeqNo(name enc.Name, seqNo uint64) error

SetSeqNo sets the sequence number for a name. The instance must only set sequence numbers for names it owns. The sequence number must be greater than the previous value.

func (*SvSync) Start

func (s *SvSync) Start() (err error)

Start the SV Sync instance.

func (*SvSync) Stop

func (s *SvSync) Stop() error

Stop the SV Sync instance.

func (*SvSync) String

func (s *SvSync) String() string

Instance log identifier

type SvSyncOpts

type SvSyncOpts struct {
	// NDN Object API client
	Client ndn.Client
	// Sync group prefix for the SVS group
	GroupPrefix enc.Name
	// Callback for SVSync updates
	OnUpdate func(SvSyncUpdate)

	// Name of this instance for security
	// This name will be used directly for the Sync Data name;
	// only a version component will be appended.
	// If not provided, the GroupPrefix will be used instead.
	SyncDataName enc.Name

	// Initial state vector from persistence
	InitialState *spec_svs.StateVector
	// Boot time from persistence
	BootTime uint64
	// Periodic timeout for sending Sync Interests (default 30s)
	PeriodicTimeout time.Duration
	// Suppression period for ignoring outdated Sync Interests (default 200ms)
	SuppressionPeriod time.Duration

	// Passive mode does not send sign Sync Interests
	Passive bool
	// IgnoreValidity ignores validity period in the validation chain
	IgnoreValidity optional.Optional[bool]
}

type SvSyncUpdate

type SvSyncUpdate struct {
	Name enc.Name
	Boot uint64
	High uint64
	Low  uint64
}

type SvsALO

type SvsALO struct {
	// contains filtered or unexported fields
}

SvsALO is a Sync Transport with At Least One delivery semantics.

func NewSvsALO

func NewSvsALO(opts SvsAloOpts) (*SvsALO, error)

NewSvsALO creates a new SvsALO instance.

func (*SvsALO) BootTime

func (s *SvsALO) BootTime() uint64

BootTime returns the boot time of the instance.

func (*SvsALO) DataPrefix

func (s *SvsALO) DataPrefix() enc.Name

DataPrefix is the data route prefix for this instance.

func (*SvsALO) GroupPrefix

func (s *SvsALO) GroupPrefix() enc.Name

GroupPrefix is the group prefix for this instance.

func (*SvsALO) Publish

func (s *SvsALO) Publish(content enc.Wire) (name enc.Name, state enc.Wire, err error)

Publish sends a message to the group

func (*SvsALO) SVS

func (s *SvsALO) SVS() *SvSync

SVS returns the underlying SVS instance.

func (*SvsALO) SeqNo

func (s *SvsALO) SeqNo() uint64

SeqNo returns the current sequence number of the instance.

func (*SvsALO) SetOnError

func (s *SvsALO) SetOnError(callback func(error))

SetOnError sets the error callback. You can likely cast the received error as SyncError.

SyncError includes the name of the affected publisher and the error. Applications can use this callback to selectively unsubscribe from publishers that are not responding.

func (*SvsALO) SetOnPublisher

func (s *SvsALO) SetOnPublisher(callback func(enc.Name))

SetOnPublisher sets the publisher callback.

This will be called when an update from a new publisher is received. This includes both updates for publishers that are already subscribed and other non-subscribed publishers. Applications can use this callback to test the liveness of publishers and selectively subscribe to them.

func (*SvsALO) Start

func (s *SvsALO) Start() error

Start starts the SvsALO instance.

func (*SvsALO) Stop

func (s *SvsALO) Stop() error

Stop stops the SvsALO instance.

func (*SvsALO) String

func (s *SvsALO) String() string

String is the log identifier.

func (*SvsALO) SubscribePublisher

func (s *SvsALO) SubscribePublisher(prefix enc.Name, callback func(SvsPub)) error

SubscribePublisher subscribes to all publishers matchin a name prefix. Only one subscriber per prefix is allowed. If the prefix is already subscribed, the callback is replaced.

func (*SvsALO) SyncPrefix

func (s *SvsALO) SyncPrefix() enc.Name

SyncPrefix is the sync route prefix for this instance.

func (*SvsALO) UnsubscribePublisher

func (s *SvsALO) UnsubscribePublisher(prefix enc.Name)

UnsubscribePublisher unsubscribes removes callbacks added with subscribe. The callback may still receive messages for some time after this call. The application must handle these messages correctly.

type SvsAloOpts

type SvsAloOpts struct {
	// Name is the name of this instance producer.
	Name enc.Name
	// Svs is the options for the underlying SVS instance.
	Svs SvSyncOpts
	// Snapshot is the snapshot strategy.
	Snapshot Snapshot
	// InitialState is the initial state of the instance.
	InitialState enc.Wire

	// MaxPipelineSize is the number of objects to fetch
	// concurrently for a single publisher (default 10)
	MaxPipelineSize uint64

	// MulticastPrefix is a prefix to prepend to Sync Interests
	MulticastPrefix enc.Name
}

type SvsPub

type SvsPub struct {
	// Publisher that produced the data.
	Publisher enc.Name
	// DataName is the name of the data.
	DataName enc.Name
	// Content of the data publication.
	Content enc.Wire
	// Boot time of the publisher.
	BootTime uint64
	// Sequence number of the publisher.
	SeqNum uint64
	// IsSnapshot is true if this is a snapshot.
	IsSnapshot bool
	// State is the state after this publication is applied.
	State enc.Wire
	// contains filtered or unexported fields
}

SvsPub is the generic received data publication from SVS.

func (*SvsPub) Bytes

func (p *SvsPub) Bytes() []byte

Bytes gets the bytes of the data publication.

This will allocate a new byte slice and copy the content. Using Content directly is more efficient whenever possible.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL