Documentation
¶
Index ¶
- Constants
- Variables
- type ErrSync
- type SimplePs
- type SimplePsSub
- type Snapshot
- type SnapshotNodeHistory
- type SnapshotNodeLatest
- type SnapshotNull
- type SvMap
- func (m SvMap[V]) Clear()
- func (m SvMap[V]) Encode(seq func(V) uint64) *spec_svs.StateVector
- func (m SvMap[V]) Get(hash string, boot uint64) (value V)
- func (m SvMap[V]) IsNewerThan(other SvMap[V], cmp func(V, V) bool) bool
- func (m SvMap[V]) Iter() iter.Seq2[enc.Name, []SvMapVal[V]]
- func (m SvMap[V]) Set(hash string, boot uint64, value V)
- type SvMapVal
- type SvSync
- func (s *SvSync) GetBootTime() uint64
- func (s *SvSync) GetNames() []enc.Name
- func (s *SvSync) GetSeqNo(name enc.Name) uint64
- func (s *SvSync) IncrSeqNo(name enc.Name) uint64
- func (s *SvSync) SetSeqNo(name enc.Name, seqNo uint64) error
- func (s *SvSync) Start() (err error)
- func (s *SvSync) Stop() error
- func (s *SvSync) String() string
- type SvSyncOpts
- type SvSyncUpdate
- type SvsALO
- func (s *SvsALO) BootTime() uint64
- func (s *SvsALO) DataPrefix() enc.Name
- func (s *SvsALO) GroupPrefix() enc.Name
- func (s *SvsALO) Publish(content enc.Wire) (name enc.Name, state enc.Wire, err error)
- func (s *SvsALO) SVS() *SvSync
- func (s *SvsALO) SeqNo() uint64
- func (s *SvsALO) SetOnError(callback func(error))
- func (s *SvsALO) SetOnPublisher(callback func(enc.Name))
- func (s *SvsALO) Start() error
- func (s *SvsALO) Stop() error
- func (s *SvsALO) String() string
- func (s *SvsALO) SubscribePublisher(prefix enc.Name, callback func(SvsPub)) error
- func (s *SvsALO) SyncPrefix() enc.Name
- func (s *SvsALO) UnsubscribePublisher(prefix enc.Name)
- type SvsAloOpts
- type SvsPub
Constants ¶
const SnapHistoryIndexFreshness = time.Millisecond * 10
Variables ¶
var ErrSnapshot = errors.New("snapshot error")
Functions ¶
This section is empty.
Types ¶
type ErrSync ¶
type SimplePs ¶
type SimplePs[V any] struct { // contains filtered or unexported fields }
SimplePs is a simple Pub/Sub system.
func NewSimplePs ¶
Constructs a new SimplePs instance with an empty map for managing subscribers and producers of type V.
func (*SimplePs[V]) Publish ¶
Publishes the given data to all subscribers associated with the specified name by invoking their callback functions.
func (*SimplePs[V]) Subs ¶
Returns an iterator over subscriber callbacks whose registered prefix matches the given name prefix.
func (*SimplePs[V]) Subscribe ¶
Registers a callback function to be invoked when data matching the specified prefix is published, storing the subscription in the internal map and panicking if the callback is nil.
func (*SimplePs[V]) Unsubscribe ¶
Removes a subscription to the specified prefix by deleting the corresponding entry from the internal subscription map using its TLV-encoded string representation.
type SimplePsSub ¶
type Snapshot ¶
type Snapshot interface { // Snapshot returns the Snapshot trait. Snapshot() Snapshot // contains filtered or unexported methods }
type SnapshotNodeHistory ¶
type SnapshotNodeHistory struct { // Client is the object client. Client ndn.Client // Threshold is the number of updates before a snapshot is taken. Threshold uint64 // Compress is the optional callback to compress the history snapshot. // // 1. The snapshot should be compressed in place. // 2. If grouping is used, the last sequence number of a group should be kept. // Earlier sequence numbers can then be removed. // 3. #2 implies that the last sequence number cannot be removed. // // For example, for snapshot 1, [2, 3, 4], (5), 6, (7), 8, 9, 10: // - [VALID] 1, [234], 6, (57), 8, 10 // - [INVALID] 1, 4[234], 6, 7(57), 8, 9, 10 // - [INVALID] 1, 3[234], 6, 7(57), 8, 9, 10 // - [INVALID] 1, 4[234], 5(57), 6, 8, 9, 10 // - [INVALID] 1, 4[234], 6, 8, 9, 10 Compress func(*svs_ps.HistorySnap) // In Repo mode, all snapshots are fetched automtically for persistence. IsRepo bool // IgnoreValidity ignores validity period in the validation chain IgnoreValidity optional.Optional[bool] // contains filtered or unexported fields }
SnapshotNodeLatest is a snapshot strategy that assumes that it is not possible to take a snapshot of the application state. Instead, it creates a snapshot of the entire publication history.
This strategy should be used with highly persistent storage, as it will store all publications since the node bootstrapped, and fetch publications from a node's previous instances (bootstraps). To ensure that publications from previous instances are available, the application must use NDN Repo.
func (*SnapshotNodeHistory) Snapshot ¶
func (s *SnapshotNodeHistory) Snapshot() Snapshot
Returns the current SnapshotNodeHistory instance as a Snapshot interface.
func (*SnapshotNodeHistory) String ¶
func (s *SnapshotNodeHistory) String() string
Returns a string representation of the SnapshotNodeHistory type, which is "snapshot-node-history".
type SnapshotNodeLatest ¶
type SnapshotNodeLatest struct { // Client is the object client. Client ndn.Client // SnapMe is the callback to get a snapshot of the application state. // // The state should encode the entire state of the node, and should replace // any previous publications completely. If this snapshot is delivered to a // node, previous publications will be ignored by the receiving node. // // The callback is passed the name of the snapshot that will be created. SnapMe func(enc.Name) (enc.Wire, error) // Threshold is the number of updates before a snapshot is taken. Threshold uint64 // IgnoreValidity ignores validity period in the validation chain IgnoreValidity optional.Optional[bool] // contains filtered or unexported fields }
SnapshotNodeLatest is a snapshot strategy that takes a snapshot of the application state whenever a certain number of updates have been made.
Each snapshot is treated as self-contained and replaces any previous publications completely. Only the latest (hence the name) snapshot is fetched by other nodes, and previous publications are ignored.
When a node bootstraps again, this strategy assumes that the previous state is now invalid and fetches the latest snapshot.
func (*SnapshotNodeLatest) Snapshot ¶
func (s *SnapshotNodeLatest) Snapshot() Snapshot
Returns the current SnapshotNodeLatest instance as a Snapshot interface.
func (*SnapshotNodeLatest) String ¶
func (s *SnapshotNodeLatest) String() string
Returns a string representation of the SnapshotNodeLatest instance, identifying it as "snapshot-node-latest".
type SnapshotNull ¶
type SnapshotNull struct { }
SnapshotNull is a non-snapshot strategy.
func (*SnapshotNull) Snapshot ¶
func (s *SnapshotNull) Snapshot() Snapshot
Returns the current snapshot instance without modification, indicating this object is already a snapshot.
type SvMap ¶
Map representation of the state vector.
func (SvMap[V]) Clear ¶
func (m SvMap[V]) Clear()
Clears all entries from the SvMap if it is not nil, preventing panic when the map is nil.
func (SvMap[V]) Encode ¶
func (m SvMap[V]) Encode(seq func(V) uint64) *spec_svs.StateVector
Encode the state vector map to a StateVector. seq is the function to get the sequence number
func (SvMap[V]) IsNewerThan ¶
Check if a SvMap is newer than another. cmp(a, b) is the function to compare values (a > b).
type SvSync ¶
type SvSync struct {
// contains filtered or unexported fields
}
func (*SvSync) GetBootTime ¶
Returns the boot time of the SvSync instance as a 64-bit unsigned integer.
func (*SvSync) GetNames ¶
Returns a slice containing all names currently stored in the synchronization state, ensuring thread-safety through mutual exclusion during access.
func (*SvSync) IncrSeqNo ¶
IncrSeqNo increments the sequence number for a name. The instance must only increment sequence numbers for names it owns.
type SvSyncOpts ¶
type SvSyncOpts struct { // NDN Object API client Client ndn.Client // Sync group prefix for the SVS group GroupPrefix enc.Name // Callback for SVSync updates OnUpdate func(SvSyncUpdate) // Name of this instance for security // This name will be used directly for the Sync Data name; // only a version component will be appended. // If not provided, the GroupPrefix will be used instead. SyncDataName enc.Name // Initial state vector from persistence InitialState *spec_svs.StateVector // Boot time from persistence BootTime uint64 // Periodic timeout for sending Sync Interests (default 30s) PeriodicTimeout time.Duration // Suppression period for ignoring outdated Sync Interests (default 200ms) SuppressionPeriod time.Duration // Passive mode does not send sign Sync Interests Passive bool // IgnoreValidity ignores validity period in the validation chain IgnoreValidity optional.Optional[bool] }
type SvsALO ¶
type SvsALO struct {
// contains filtered or unexported fields
}
SvsALO is a Sync Transport with At Least One delivery semantics.
func NewSvsALO ¶
func NewSvsALO(opts SvsAloOpts) (*SvsALO, error)
NewSvsALO creates a new SvsALO instance.
func (*SvsALO) DataPrefix ¶
DataPrefix is the data route prefix for this instance.
func (*SvsALO) GroupPrefix ¶
GroupPrefix is the group prefix for this instance.
func (*SvsALO) SetOnError ¶
SetOnError sets the error callback. You can likely cast the received error as SyncError.
SyncError includes the name of the affected publisher and the error. Applications can use this callback to selectively unsubscribe from publishers that are not responding.
func (*SvsALO) SetOnPublisher ¶
SetOnPublisher sets the publisher callback.
This will be called when an update from a new publisher is received. This includes both updates for publishers that are already subscribed and other non-subscribed publishers. Applications can use this callback to test the liveness of publishers and selectively subscribe to them.
func (*SvsALO) SubscribePublisher ¶
SubscribePublisher subscribes to all publishers matchin a name prefix. Only one subscriber per prefix is allowed. If the prefix is already subscribed, the callback is replaced.
func (*SvsALO) SyncPrefix ¶
SyncPrefix is the sync route prefix for this instance.
func (*SvsALO) UnsubscribePublisher ¶
UnsubscribePublisher unsubscribes removes callbacks added with subscribe. The callback may still receive messages for some time after this call. The application must handle these messages correctly.
type SvsAloOpts ¶
type SvsAloOpts struct { // Name is the name of this instance producer. Name enc.Name // Svs is the options for the underlying SVS instance. Svs SvSyncOpts // Snapshot is the snapshot strategy. Snapshot Snapshot // InitialState is the initial state of the instance. InitialState enc.Wire // MaxPipelineSize is the number of objects to fetch // concurrently for a single publisher (default 10) MaxPipelineSize uint64 // MulticastPrefix is a prefix to prepend to Sync Interests MulticastPrefix enc.Name }
type SvsPub ¶
type SvsPub struct { // Publisher that produced the data. Publisher enc.Name // DataName is the name of the data. DataName enc.Name // Content of the data publication. Content enc.Wire // Boot time of the publisher. BootTime uint64 // Sequence number of the publisher. SeqNum uint64 // IsSnapshot is true if this is a snapshot. IsSnapshot bool // State is the state after this publication is applied. State enc.Wire // contains filtered or unexported fields }
SvsPub is the generic received data publication from SVS.