crdt

package
v0.7.1-dev Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2026 License: AGPL-3.0, AGPL-3.0-or-later Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatsRepoName = "/STATS"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Broadcaster

type Broadcaster interface {
	Broadcast(ctx context.Context, data []byte) error
	Next(ctx context.Context) ([]byte, error)
}

Broadcaster interface for CRDT synchronization

type CRDTRouter

type CRDTRouter interface {
	FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.AddrInfo
}

type CRDTStatsStore

type CRDTStatsStore struct {
	// contains filtered or unexported fields
}

CRDTStatsStore implements a PN-counter on top of go-ds-crdt that stays correct under total local-data loss while keeping storage proportional to (nodes × restarts × keys) instead of (operations).

Layout. Every value lives under a key of the shape

/STATS/{incr|decr}/{dataKey}/{nodeID}/{generation}

where {generation} is a fresh 128-bit cryptographic nonce minted once per process start. Each (namespace, dataKey, nodeID, generation) tuple is owned by exactly one writer — the process that minted that generation — so write semantics inside it are trivially safe: we keep the value in memory, bump it under a mutex, and Put the new value to the CRDT. We never read our own value back from the CRDT, so eventual-consistency lag in the local DAG view cannot lose increments.

GetAggregatedStat sums values across all (nodeID, generation) sub-counters under the prefix and returns `Σ incr − Σ decr` (clamped at zero).

Crash-safety.

  • Soft crash (process restart, disk intact): the next process boots with a brand-new generation. The previous generation's last-persisted value is still in the CRDT (and on peers); the new generation starts from 0 and is summed into the aggregate on top of the old one. The only loss is whatever +1's the dying process buffered locally without persisting/broadcasting — same fundamental durability boundary as the underlying datastore.
  • Total local-data loss: identical recovery path. Old generations are pulled back via the CRDT DAG from peers; the new generation cannot collide with any of them because its nonce is fresh, so peers' replayed history is preserved verbatim and this process simply accrues a new sub-counter alongside.

func NewCRDTStatsStore

func NewCRDTStatsStore(
	ctx context.Context,
	broadcaster Broadcaster,
	datastore CRDTStorer,
	node host.Host,
	router CRDTRouter,
) (*CRDTStatsStore, error)

NewCRDTStatsStore creates a new CRDT-based statistics store

func (*CRDTStatsStore) Close

func (s *CRDTStatsStore) Close() error

Close stops the CRDT store

func (*CRDTStatsStore) Decrement added in v0.6.4

func (s *CRDTStatsStore) Decrement(key ds.Key) error

Decrement bumps this process's `decr` sub-counter for key by 1 and persists the new running total to the CRDT.

func (*CRDTStatsStore) GetAggregatedStat

func (s *CRDTStatsStore) GetAggregatedStat(key ds.Key) (uint64, error)

GetAggregatedStat returns the cluster-wide PN-counter for key. Sum is taken across every (nodeID, generation) sub-counter that has been merged into the local CRDT view.

func (*CRDTStatsStore) Increment added in v0.6.4

func (s *CRDTStatsStore) Increment(key ds.Key) error

Increment bumps this process's `incr` sub-counter for key by 1 and persists the new running total to the CRDT.

type CRDTStorer

type CRDTStorer interface {
	ds.Datastore
}

type GossipBroadcaster

type GossipBroadcaster struct {
	// contains filtered or unexported fields
}

GossipBroadcaster adapts Gossip to CRDT Broadcaster interface

func NewGossipBroadcaster

func NewGossipBroadcaster(ctx context.Context, gossip GossipPubSuber) (*GossipBroadcaster, error)

NewGossipBroadcaster creates a new Gossip-based broadcaster for CRDT

func (*GossipBroadcaster) Broadcast

func (gb *GossipBroadcaster) Broadcast(_ context.Context, data []byte) error

Broadcast sends data via Gossip.

gb.gossip and gb.topic are set once at construction and never mutated, so this method intentionally does NOT take gb.mx — otherwise a slow PublishRaw (network I/O) would block close() and Receive() through the same lock, defeating the deadlock fix in Receive().

func (*GossipBroadcaster) Next

func (gb *GossipBroadcaster) Next(ctx context.Context) ([]byte, error)

Next receives broadcasted data

func (*GossipBroadcaster) Receive

func (gb *GossipBroadcaster) Receive(data []byte)

Receive is called by Gossip subscription handler to deliver data.

All channel operations are non-blocking. If the buffer is full, the oldest pending message is dropped and the new one is enqueued. The previous implementation did `<-gb.dataChan` unconditionally inside the `default` branch, which could deadlock under the held mutex when a concurrent Next() drained the channel between the select decision and the receive.

The `closed` flag and `close()` taking the same mutex prevent a "send on closed channel" panic when Next() shuts the broadcaster down concurrently with an in-flight Receive.

type GossipPubSuber

type GossipPubSuber interface {
	PublishRaw(topicName string, data []byte) error
	SubscribeRaw(topicName string, h func([]byte) error) error
}

GossipPublisher interface for publishing to Gossip

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL