Documentation
¶
Index ¶
Constants ¶
const (
StatsRepoName = "/STATS"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Broadcaster ¶
type Broadcaster interface {
Broadcast(ctx context.Context, data []byte) error
Next(ctx context.Context) ([]byte, error)
}
Broadcaster interface for CRDT synchronization
type CRDTRouter ¶
type CRDTStatsStore ¶
type CRDTStatsStore struct {
// contains filtered or unexported fields
}
CRDTStatsStore implements a PN-counter on top of go-ds-crdt that stays correct under total local-data loss while keeping storage proportional to (nodes × restarts × keys) instead of (operations).
Layout. Every value lives under a key of the shape
/STATS/{incr|decr}/{dataKey}/{nodeID}/{generation}
where {generation} is a fresh 128-bit cryptographic nonce minted once per process start. Each (namespace, dataKey, nodeID, generation) tuple is owned by exactly one writer — the process that minted that generation — so write semantics inside it are trivially safe: we keep the value in memory, bump it under a mutex, and Put the new value to the CRDT. We never read our own value back from the CRDT, so eventual-consistency lag in the local DAG view cannot lose increments.
GetAggregatedStat sums values across all (nodeID, generation) sub-counters under the prefix and returns `Σ incr − Σ decr` (clamped at zero).
Crash-safety.
- Soft crash (process restart, disk intact): the next process boots with a brand-new generation. The previous generation's last-persisted value is still in the CRDT (and on peers); the new generation starts from 0 and is summed into the aggregate on top of the old one. The only loss is whatever +1's the dying process buffered locally without persisting/broadcasting — same fundamental durability boundary as the underlying datastore.
- Total local-data loss: identical recovery path. Old generations are pulled back via the CRDT DAG from peers; the new generation cannot collide with any of them because its nonce is fresh, so peers' replayed history is preserved verbatim and this process simply accrues a new sub-counter alongside.
func NewCRDTStatsStore ¶
func NewCRDTStatsStore( ctx context.Context, broadcaster Broadcaster, datastore CRDTStorer, node host.Host, router CRDTRouter, ) (*CRDTStatsStore, error)
NewCRDTStatsStore creates a new CRDT-based statistics store
func (*CRDTStatsStore) Decrement ¶ added in v0.6.4
func (s *CRDTStatsStore) Decrement(key ds.Key) error
Decrement bumps this process's `decr` sub-counter for key by 1 and persists the new running total to the CRDT.
func (*CRDTStatsStore) GetAggregatedStat ¶
func (s *CRDTStatsStore) GetAggregatedStat(key ds.Key) (uint64, error)
GetAggregatedStat returns the cluster-wide PN-counter for key. Sum is taken across every (nodeID, generation) sub-counter that has been merged into the local CRDT view.
type CRDTStorer ¶
type GossipBroadcaster ¶
type GossipBroadcaster struct {
// contains filtered or unexported fields
}
GossipBroadcaster adapts Gossip to CRDT Broadcaster interface
func NewGossipBroadcaster ¶
func NewGossipBroadcaster(ctx context.Context, gossip GossipPubSuber) (*GossipBroadcaster, error)
NewGossipBroadcaster creates a new Gossip-based broadcaster for CRDT
func (*GossipBroadcaster) Broadcast ¶
func (gb *GossipBroadcaster) Broadcast(_ context.Context, data []byte) error
Broadcast sends data via Gossip.
gb.gossip and gb.topic are set once at construction and never mutated, so this method intentionally does NOT take gb.mx — otherwise a slow PublishRaw (network I/O) would block close() and Receive() through the same lock, defeating the deadlock fix in Receive().
func (*GossipBroadcaster) Next ¶
func (gb *GossipBroadcaster) Next(ctx context.Context) ([]byte, error)
Next receives broadcasted data
func (*GossipBroadcaster) Receive ¶
func (gb *GossipBroadcaster) Receive(data []byte)
Receive is called by Gossip subscription handler to deliver data.
All channel operations are non-blocking. If the buffer is full, the oldest pending message is dropped and the new one is enqueued. The previous implementation did `<-gb.dataChan` unconditionally inside the `default` branch, which could deadlock under the held mutex when a concurrent Next() drained the channel between the select decision and the receive.
The `closed` flag and `close()` taking the same mutex prevent a "send on closed channel" panic when Next() shuts the broadcaster down concurrently with an in-flight Receive.