crdt

package
v0.42.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// SyncStep1 is sent by a connecting client: "here is my state vector".
	SyncStep1 = "sync_step1"
	// SyncStep2 is the server response: "here are ops you are missing".
	SyncStep2 = "sync_step2"
	// SyncUpdate is an incremental update (ops broadcast).
	SyncUpdate = "sync_update"
)

SyncMessage types for the state-vector based sync protocol.

Variables

View Source
var ErrDocIDReplay = fmt.Errorf("crdt: envelope bound to a different document")

ErrDocIDReplay is returned when OpenOps unwraps an envelope whose authenticated docID field does not match the current document.

View Source
var ErrPrivacyMismatch = fmt.Errorf("crdt: privacy backend mismatch")

ErrPrivacyMismatch is returned when an OpEnvelope's PrivacyTag does not match the document's privacy backend.

Functions

func AnchorBackground added in v0.40.2

func AnchorBackground(ctx context.Context, doc *Document, anchorer Anchorer, cfg AnchorConfig)

AnchorBackground runs a background goroutine that periodically anchors the document's state. Cancel the context to stop it.

func AppIDFromName added in v0.40.2

func AppIDFromName(name string) [32]byte

AppIDFromName computes the 32-byte appID as SHA-256 of the given name.

func DocumentMerkleRoot added in v0.40.2

func DocumentMerkleRoot(doc *Document) ([32]byte, error)

DocumentMerkleRoot computes SHA-256 of the gob-encoded document snapshot.

Types

type AgePrivacy added in v0.40.2

type AgePrivacy struct {
	// contains filtered or unexported fields
}

AgePrivacy encrypts every op under a set of age recipients and decrypts with a single identity held by the local replica.

func NewAgePrivacy added in v0.40.2

func NewAgePrivacy(identity age.Identity, recipients []age.Recipient) (*AgePrivacy, error)

NewAgePrivacy constructs the age backend. `identity` is the local replica's secret key; `recipients` is the set of public keys of every device authorised to decrypt the log. The local identity's corresponding recipient should appear in `recipients` so we can round-trip our own ops.

func (*AgePrivacy) DecryptOp added in v0.40.2

func (p *AgePrivacy) DecryptOp(blob []byte) (Operation, error)

DecryptOp unseals a blob produced by EncryptOp.

func (*AgePrivacy) EncryptOp added in v0.40.2

func (p *AgePrivacy) EncryptOp(op Operation) ([]byte, error)

EncryptOp serialises the op as JSON then seals it for every recipient.

func (*AgePrivacy) Name added in v0.40.2

func (*AgePrivacy) Name() string

Name returns the stable wire tag for this backend.

type AnchorConfig added in v0.40.2

type AnchorConfig struct {
	// AppID identifies the application (32-byte hash of app name).
	AppID [32]byte

	// Interval is the maximum time between anchors. Default: 5 minutes.
	Interval time.Duration

	// OpThreshold is the number of operations that trigger an anchor.
	// Default: 1000. Set to 0 to disable op-based anchoring.
	OpThreshold int

	// RPCEndpoint is the JSON-RPC URL of the Lux node.
	RPCEndpoint string

	// From is the sender address (hex-encoded, with 0x prefix).
	From string

	// ContractAddress is the anchor precompile address.
	// Default: 0x0700000000000000000000000000000000000010
	ContractAddress string
}

AnchorConfig configures the background anchoring goroutine.

type AnchorStatus added in v0.40.2

type AnchorStatus struct {
	LastHeight     uint64
	LastRoot       [32]byte
	LastAnchoredAt time.Time
	LastError      error
}

AnchorStatus reports the current state of the auto-anchoring loop.

type Anchorer added in v0.40.2

type Anchorer interface {
	// Submit anchors the given Merkle root at the next logical height.
	Submit(ctx context.Context, root [32]byte) error

	// Verify checks whether the on-chain anchor at height matches root.
	Verify(ctx context.Context, height uint64, root [32]byte) (bool, error)

	// LatestHeight returns the highest anchored height for the configured appID.
	LatestHeight(ctx context.Context) (uint64, error)
}

Anchorer submits and verifies CRDT state anchors on a Lux chain.

type Document

type Document struct {
	// contains filtered or unexported fields
}

Document is a container for multiple named CRDT fields, representing a collaborative document.

func Decode

func Decode(data []byte, nodeID NodeID, opts ...DocumentOption) (*Document, error)

Decode deserializes a document from bytes. Pass the same DocumentOptions (especially WithPrivacy) used when the document was created so sealed snapshots can be unsealed.

func NewDocument

func NewDocument(id string, nodeID NodeID, opts ...DocumentOption) *Document

NewDocument creates a new Document with the given ID and owning nodeID. Pass WithPrivacy to select an encryption backend; default is plaintext.

func (*Document) AnchorStatus added in v0.40.2

func (d *Document) AnchorStatus() AnchorStatus

AnchorStatus returns the current state of the auto-anchor loop. Returns a zero value if no anchorer is configured.

func (*Document) Close added in v0.40.2

func (d *Document) Close()

Close stops the auto-anchor goroutine (if running) and waits for it to exit. Safe to call multiple times or on a Document with no anchorer.

func (*Document) Diff

func (d *Document) Diff(since StateVersion) []Operation

Diff returns operations representing changes since the given state version. Currently operates at text-field granularity using RGA state vectors.

func (*Document) Encode

func (d *Document) Encode() ([]byte, error)

Encode serializes the document to bytes. For non-plaintext backends, the snapshot is sealed through the privacy backend so no plaintext leaks into the output.

func (*Document) GetCounter

func (d *Document) GetCounter(field string) *PNCounter

GetCounter returns the PNCounter field with the given name, creating it if needed.

func (*Document) GetMVRegister

func (d *Document) GetMVRegister(field string) *MVRegister

GetMVRegister returns the MVRegister field with the given name, creating it if needed.

func (*Document) GetRegister

func (d *Document) GetRegister(field string) *LWWRegister

GetRegister returns the LWWRegister field with the given name, creating it if needed.

func (*Document) GetSet

func (d *Document) GetSet(field string) *ORSet

GetSet returns the ORSet field with the given name, creating it if needed.

func (*Document) GetText

func (d *Document) GetText(field string) *RGA

GetText returns the RGA text field with the given name, creating it if needed.

func (*Document) ID

func (d *Document) ID() string

ID returns the document identifier.

func (*Document) Merge

func (d *Document) Merge(other *Document)

Merge merges a remote Document into this one.

func (*Document) OpenOps added in v0.40.2

func (d *Document) OpenOps(envs []OpEnvelope) ([]Operation, error)

OpenOps decrypts a slice of OpEnvelopes into plaintext Operations. Returns ErrPrivacyMismatch if any envelope's tag differs from the document's backend, or ErrDocIDReplay if the authenticated docID inside the envelope does not match this document.

func (*Document) Privacy added in v0.40.2

func (d *Document) Privacy() Privacy

Privacy returns the document's privacy backend.

func (*Document) SealOps added in v0.40.2

func (d *Document) SealOps(ops []Operation) ([]OpEnvelope, error)

SealOps encrypts a slice of plaintext Operations into OpEnvelopes using the document's privacy backend. Each op's Data is prefixed with the document ID inside the authenticated payload; OpenOps refuses envelopes whose bound docID differs.

func (*Document) Version

func (d *Document) Version() StateVersion

Version returns a computed state version that merges the document's own version with state vectors from all text fields (RGAs).

type DocumentOption added in v0.40.2

type DocumentOption func(*Document)

DocumentOption configures a Document at creation time.

func WithAutoAnchor added in v0.40.2

func WithAutoAnchor(a Anchorer, interval time.Duration) DocumentOption

WithAutoAnchor enables periodic Merkle-root anchoring to a Lux chain. The goroutine starts when the Document is created and stops on Close(). If not supplied, no goroutine runs (zero overhead).

func WithPrivacy added in v0.40.2

func WithPrivacy(p Privacy) DocumentOption

WithPrivacy sets the privacy backend for a Document. Default: NewPlaintextPrivacy() (zero-overhead).

type DocumentSnapshot

type DocumentSnapshot struct {
	ID       string                         `json:"id"`
	Version  StateVersion                   `json:"version"`
	Texts    map[string]*textSnapshot       `json:"texts,omitempty"`
	Counters map[string]*counterSnapshot    `json:"counters,omitempty"`
	Sets     map[string]*setSnapshot        `json:"sets,omitempty"`
	Regs     map[string]*registerSnapshot   `json:"registers,omitempty"`
	MVRegs   map[string]*mvRegisterSnapshot `json:"mvRegisters,omitempty"`
}

DocumentSnapshot is the serializable state of a Document.

type FieldType

type FieldType uint8

FieldType identifies the CRDT type of a document field.

const (
	FieldTypeText       FieldType = iota // RGA
	FieldTypeCounter                     // PNCounter
	FieldTypeSet                         // ORSet
	FieldTypeRegister                    // LWWRegister
	FieldTypeMVRegister                  // MVRegister
)

type GCounter

type GCounter struct {
	// contains filtered or unexported fields
}

GCounter is a grow-only counter CRDT. Each node maintains its own count; the total is the sum of all counts.

func NewGCounter

func NewGCounter() *GCounter

NewGCounter returns a new empty GCounter.

func (*GCounter) Increment

func (g *GCounter) Increment(nodeID NodeID, delta uint64)

Increment adds delta to the count for nodeID.

func (*GCounter) Merge

func (g *GCounter) Merge(other *GCounter)

Merge merges a remote GCounter into this one by taking the max for each node.

func (*GCounter) State

func (g *GCounter) State() map[NodeID]uint64

State returns a copy of the internal state map.

func (*GCounter) Value

func (g *GCounter) Value() uint64

Value returns the total count across all nodes.

type LWWRegister

type LWWRegister struct {
	// contains filtered or unexported fields
}

LWWRegister is a register that resolves concurrent writes by timestamp. The most recent write (highest timestamp) wins.

func NewLWWRegister

func NewLWWRegister() *LWWRegister

NewLWWRegister returns a new empty LWWRegister.

func (*LWWRegister) Get

func (r *LWWRegister) Get() (any, Timestamp)

Get returns the current value and its timestamp.

func (*LWWRegister) Merge

func (r *LWWRegister) Merge(other *LWWRegister)

Merge merges a remote register into this one.

func (*LWWRegister) Set

func (r *LWWRegister) Set(value any, ts Timestamp)

Set updates the register value if the given timestamp is newer.

type MVEntry

type MVEntry struct {
	Value     any       `json:"value"`
	Timestamp Timestamp `json:"timestamp"`
}

MVEntry is a single versioned value in an MVRegister.

type MVRegister

type MVRegister struct {
	// contains filtered or unexported fields
}

MVRegister preserves all concurrent writes rather than picking one winner. It maintains a set of (value, timestamp) pairs. When a new value is set, it replaces all entries that are causally dominated.

func NewMVRegister

func NewMVRegister() *MVRegister

NewMVRegister returns a new empty MVRegister.

func (*MVRegister) Get

func (r *MVRegister) Get() []MVEntry

Get returns all concurrent values.

func (*MVRegister) Merge

func (r *MVRegister) Merge(other *MVRegister)

Merge merges a remote MVRegister. The result is the union of non-dominated entries.

func (*MVRegister) Set

func (r *MVRegister) Set(value any, ts Timestamp)

Set adds a new value, removing all entries whose timestamp is dominated by ts.

type NodeID

type NodeID = string

NodeID identifies a unique node/client in the distributed system.

type ORSet

type ORSet struct {
	// contains filtered or unexported fields
}

ORSet is an observed-remove set CRDT. Elements can be added and removed without conflicts. Concurrent add + remove: the add wins (add-wins semantics).

func NewORSet

func NewORSet(nodeID NodeID) *ORSet

NewORSet returns a new ORSet for the given node.

func (*ORSet) Add

func (s *ORSet) Add(key string, value any) string

Add adds an element to the set. Returns the generated unique tag.

func (*ORSet) Contains

func (s *ORSet) Contains(key string) bool

Contains checks if key is present in the set.

func (*ORSet) Elements

func (s *ORSet) Elements() map[string]any

Elements returns all keys currently in the set with one representative value each.

func (*ORSet) Merge

func (s *ORSet) Merge(other *ORSet)

Merge merges a remote ORSet into this one (union of tags).

func (*ORSet) RawState

func (s *ORSet) RawState() map[string]map[string]any

RawState returns a deep copy of internal tag state for serialization.

func (*ORSet) Remove

func (s *ORSet) Remove(key string)

Remove removes an element by key, removing all observed tags.

type ORSetElement

type ORSetElement struct {
	Value any
}

ORSetElement tracks the unique tags for an element. An element is in the set if it has at least one tag not in the tombstone set.

type OpEnvelope added in v0.40.2

type OpEnvelope struct {
	PrivacyTag string `json:"privacyTag"`
	Ciphertext []byte `json:"ct"`
}

OpEnvelope is the wire/persistence format for a single CRDT operation. It carries the privacy backend tag and the ciphertext (output of Privacy.EncryptOp). Replicas that receive an envelope whose tag does not match their own backend MUST reject it.

type OpType

type OpType uint8

OpType identifies an RGA operation kind.

const (
	OpInsert OpType = iota
	OpDelete
)

type Operation

type Operation struct {
	Field     string    `json:"field"`
	FieldType FieldType `json:"fieldType"`
	Data      []byte    `json:"data"`
}

Operation represents a serializable CRDT operation for sync.

type PNCounter

type PNCounter struct {
	// contains filtered or unexported fields
}

PNCounter is a counter that supports both increment and decrement by composing two GCounters.

func NewPNCounter

func NewPNCounter() *PNCounter

NewPNCounter returns a new PNCounter.

func (*PNCounter) Decrement

func (pn *PNCounter) Decrement(nodeID NodeID, delta uint64)

Decrement adds delta to the negative counter for nodeID.

func (*PNCounter) Increment

func (pn *PNCounter) Increment(nodeID NodeID, delta uint64)

Increment adds delta to the positive counter for nodeID.

func (*PNCounter) Merge

func (pn *PNCounter) Merge(other *PNCounter)

Merge merges a remote PNCounter into this one.

func (*PNCounter) Value

func (pn *PNCounter) Value() int64

Value returns positive - negative as a signed integer.

type PlaintextPrivacy added in v0.40.2

type PlaintextPrivacy struct{}

PlaintextPrivacy is the zero-overhead backend: it just encodes the op as JSON and reports no encryption. It exists so the CRDT layer has one code path; there are no `if privacy == nil` branches.

func NewPlaintextPrivacy added in v0.40.2

func NewPlaintextPrivacy() *PlaintextPrivacy

NewPlaintextPrivacy returns the no-op encryption backend.

func (PlaintextPrivacy) DecryptOp added in v0.40.2

func (PlaintextPrivacy) DecryptOp(b []byte) (Operation, error)

func (PlaintextPrivacy) EncryptOp added in v0.40.2

func (PlaintextPrivacy) EncryptOp(op Operation) ([]byte, error)

func (PlaintextPrivacy) Name added in v0.40.2

func (PlaintextPrivacy) Name() string

type Privacy added in v0.40.2

type Privacy interface {
	// Name returns a stable identifier carried in every OpEnvelope so a
	// replica can refuse ops from an incompatible backend.
	Name() string

	// EncryptOp seals an operation for storage and transport.
	EncryptOp(op Operation) ([]byte, error)

	// DecryptOp unseals an op produced by EncryptOp.
	DecryptOp(blob []byte) (Operation, error)
}

Privacy is the pluggable encryption boundary for a CRDT op-log.

Every op produced by the local replica is sealed with EncryptOp before persistence or gossip. Every op consumed from the log is opened with DecryptOp before application. One interface, one path.

func DefaultPrivacy added in v0.40.2

func DefaultPrivacy() Privacy

DefaultPrivacy returns a Privacy implementation suitable for the current build. The default is plaintext so tests and dev loops require no key material. Production callers pass an explicit NewAgePrivacy(...) or NewFHEPrivacy(...) to NewDocument().

type RGA

type RGA struct {
	// contains filtered or unexported fields
}

RGA (Replicated Growable Array) is a CRDT for collaborative text editing. It maintains a linked list of character nodes with unique IDs that allow concurrent inserts to be ordered deterministically.

func NewRGA

func NewRGA(nodeID NodeID) *RGA

NewRGA creates a new RGA instance for the given node.

func (*RGA) ApplyOp

func (r *RGA) ApplyOp(op RGAOp) error

ApplyOp applies a remote operation. Returns an error if the operation references a parent that doesn't exist (operation should be retried later).

func (*RGA) Delete

func (r *RGA) Delete(position int) (RGAOp, error)

Delete deletes the character at the given visible position (0-based). Returns the generated operation for replication.

func (*RGA) Insert

func (r *RGA) Insert(position int, ch rune) RGAOp

Insert inserts a character after the given position (0-based index). Position -1 means insert at the beginning. Returns the generated operation for replication.

func (*RGA) InsertText

func (r *RGA) InsertText(position int, text string) []RGAOp

InsertText inserts a string starting at the given position. Returns the operations generated.

func (*RGA) Length

func (r *RGA) Length() int

Length returns the number of visible characters.

func (*RGA) Merge

func (r *RGA) Merge(other *RGA) error

Merge applies all operations from a remote RGA. This is a convenience method that replays operations.

func (*RGA) Operations

func (r *RGA) Operations() []RGAOp

Operations returns pending operations and clears the pending buffer.

func (*RGA) OpsSince

func (r *RGA) OpsSince(sv map[NodeID]uint64) []RGAOp

OpsSince returns all operations for nodes whose sequence is greater than the values in the provided state vector.

func (*RGA) StateVector

func (r *RGA) StateVector() map[NodeID]uint64

StateVector returns a map of nodeID -> max sequence seen, used for sync.

func (*RGA) ToString

func (r *RGA) ToString() string

ToString returns the current visible text content.

type RGAID

type RGAID struct {
	Seq    uint64 `json:"seq"`
	NodeID NodeID `json:"nodeId"`
}

RGAID uniquely identifies a character node in the RGA.

func (RGAID) After

func (id RGAID) After(other RGAID) bool

After reports whether id should be ordered after other (for tie-breaking).

func (RGAID) IsZero

func (id RGAID) IsZero() bool

IsZero reports whether the ID is the zero/sentinel value.

func (RGAID) String

func (id RGAID) String() string

String returns a human-readable ID.

type RGAOp

type RGAOp struct {
	Type      OpType    `json:"type"`
	ID        RGAID     `json:"id"`
	ParentID  RGAID     `json:"parentId"` // for insert: the ID of the node this is inserted after
	Char      rune      `json:"char"`
	Timestamp Timestamp `json:"timestamp"`
}

RGAOp represents a single RGA operation for replication.

type RPCAnchorer added in v0.40.2

type RPCAnchorer struct {
	// contains filtered or unexported fields
}

RPCAnchorer implements Anchorer via JSON-RPC calls to a Lux node.

func NewRPCAnchorer added in v0.40.2

func NewRPCAnchorer(cfg AnchorConfig, rpcCall func(ctx context.Context, method string, params []any) (json.RawMessage, error)) *RPCAnchorer

NewRPCAnchorer creates an Anchorer that talks to a Lux node via JSON-RPC. The rpcCall function is injected for testability. Pass nil to use the default HTTP JSON-RPC client targeting cfg.RPCEndpoint.

func (*RPCAnchorer) LatestHeight added in v0.40.2

func (a *RPCAnchorer) LatestHeight(ctx context.Context) (uint64, error)

LatestHeight queries the latest anchored height via eth_call.

func (*RPCAnchorer) Submit added in v0.40.2

func (a *RPCAnchorer) Submit(ctx context.Context, root [32]byte) error

Submit sends an anchor transaction via eth_sendTransaction.

func (*RPCAnchorer) Verify added in v0.40.2

func (a *RPCAnchorer) Verify(ctx context.Context, height uint64, root [32]byte) (bool, error)

Verify queries the anchor precompile via eth_call and compares roots.

type StateVersion

type StateVersion map[NodeID]uint64

StateVersion represents a document's version as a state vector. Maps nodeID -> max sequence number seen from that node.

func (StateVersion) Dominates

func (v StateVersion) Dominates(other StateVersion) bool

Dominates reports whether v causally dominates other (every entry in other is <= the corresponding entry in v).

func (StateVersion) Merge

func (v StateVersion) Merge(other StateVersion) StateVersion

Merge returns a new StateVersion taking the max of each entry.

type SyncBroadcastFunc

type SyncBroadcastFunc func(docID string, excludeClient string, msg []byte)

SyncBroadcastFunc is called when operations need to be broadcast to other clients.

type SyncManager

type SyncManager struct {
	// contains filtered or unexported fields
}

SyncManager handles CRDT document synchronization across clients.

func NewSyncManager

func NewSyncManager(broadcast SyncBroadcastFunc) *SyncManager

NewSyncManager creates a new SyncManager. The broadcast function is called whenever operations should be sent to other clients.

func (*SyncManager) BroadcastOps

func (sm *SyncManager) BroadcastOps(docID string, ops []Operation)

BroadcastOps seals and broadcasts operations to all clients connected to a document.

func (*SyncManager) Documents

func (sm *SyncManager) Documents() []string

Documents returns a list of all registered document IDs.

func (*SyncManager) GetDocument

func (sm *SyncManager) GetDocument(id string) *Document

GetDocument returns a registered document by ID.

func (*SyncManager) GetOrCreateDocument

func (sm *SyncManager) GetOrCreateDocument(id string, nodeID NodeID, opts ...DocumentOption) *Document

GetOrCreateDocument returns an existing document or creates a new one.

func (*SyncManager) HandleSync

func (sm *SyncManager) HandleSync(clientID string, raw []byte) ([]byte, error)

HandleSync processes an incoming sync message from a client and returns a response message (if any). This implements the state-vector sync protocol:

  1. Client sends SyncStep1 with its state vector
  2. Server responds with SyncStep2 containing ops the client is missing
  3. Server also sends the server's state vector so the client can respond with SyncStep2
  4. Incremental updates are broadcast as SyncUpdate

func (*SyncManager) RegisterDocument

func (sm *SyncManager) RegisterDocument(id string, doc *Document)

RegisterDocument adds a document to the sync manager.

func (*SyncManager) UnregisterDocument

func (sm *SyncManager) UnregisterDocument(id string)

UnregisterDocument removes a document from the sync manager.

type SyncMessage

type SyncMessage struct {
	Type        string       `json:"type"`
	DocID       string       `json:"docId"`
	ClientID    string       `json:"clientId,omitempty"`
	StateVector StateVersion `json:"stateVector,omitempty"`
	Envelopes   []OpEnvelope `json:"envelopes,omitempty"`
}

SyncMessage is the wire format for CRDT sync messages. Every message carries Envelopes — sealed ops produced by SealOps. There is no plaintext-ops field; PlaintextPrivacy produces envelopes that happen to contain JSON as the "ciphertext". One wire format.

type Timestamp

type Timestamp struct {
	Time   int64  `json:"time"`
	NodeID NodeID `json:"nodeId"`
}

Timestamp is a Lamport-style logical clock for ordering operations.

func (Timestamp) After

func (t Timestamp) After(other Timestamp) bool

After reports whether t is causally after other. Ties are broken by NodeID lexicographic order.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL