version

package
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2025 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package version provides various version implementations for the synckit library.

This package contains implementations of the sync.Version interface, which is used to track causality and ordering of events in distributed systems.

Vector Clocks

The primary implementation is VectorClock, which provides robust causality tracking for distributed systems. Vector clocks can determine if one version happened-before, happened-after, or is concurrent with another version.

Basic Usage

import "github.com/c0deZ3R0/go-sync-kit/version"

// Create a new vector clock
clock := version.NewVectorClock()

// Increment when a node creates an event
clock.Increment("node-1")

// Merge when receiving events from another node
otherClock := version.NewVectorClockFromString(`{"node-2": 3}`)
clock.Merge(otherClock)

// Compare clocks to determine causality
relationship := clock.Compare(otherClock)
switch relationship {
case -1:
	fmt.Println("clock happened-before otherClock")
case 1:
	fmt.Println("clock happened-after otherClock")
case 0:
	fmt.Println("clocks are concurrent or equal")
}

Distributed System Example

Vector clocks are particularly useful in distributed systems where nodes may be offline and need to sync later:

// Three nodes in a distributed system
nodeA := version.NewVectorClock()
nodeB := version.NewVectorClock()
nodeC := version.NewVectorClock()

// Node A creates an event
nodeA.Increment("A")

// Node B creates an event independently (concurrent)
nodeB.Increment("B")

// Node A and B are concurrent at this point
fmt.Println("A concurrent with B:", nodeA.IsConcurrentWith(nodeB))

// Node B receives A's state and creates another event
nodeB.Merge(nodeA)
nodeB.Increment("B")

// Now B happened-after A
fmt.Println("B happened after A:", nodeB.HappenedAfter(nodeA))

Serialization

Vector clocks can be serialized to JSON for storage or network transmission:

clock := version.NewVectorClock()
clock.Increment("node-1")
clock.Increment("node-2")

// Serialize to JSON string
jsonStr := clock.String() // {"node-1":1,"node-2":1}

// Deserialize from JSON string
restored, err := version.NewVectorClockFromString(jsonStr)
if err != nil {
	log.Fatal(err)
}

Integration with go-sync-kit

Vector clocks implement the sync.Version interface and can be used with any EventStore or Transport implementation:

import (
	"github.com/c0deZ3R0/go-sync-kit"
	"github.com/c0deZ3R0/go-sync-kit/version"
	"github.com/c0deZ3R0/go-sync-kit/storage/sqlite"
)

// Create a vector clock version
currentVersion := version.NewVectorClock()
currentVersion.Increment("my-node-id")

// Use with EventStore
store, _ := sqlite.NewWithDataSource("events.db")
err := store.Store(ctx, event, currentVersion)

// The EventStore will handle version parsing automatically
events, err := store.Load(ctx, currentVersion)

Performance Considerations

Vector clocks grow with the number of nodes in the system. In systems with many nodes, consider:

- Using node ID prefixes to group related nodes - Implementing periodic cleanup of inactive nodes - Monitoring vector clock size in production

The implementation is optimized for typical distributed system usage with reasonable numbers of nodes (dozens to hundreds).

Package version provides various version implementations for the go-sync-kit library. Vector clocks are particularly useful for tracking causality in distributed systems.

Example (Basic)

Example_basic demonstrates basic vector clock operations.

package main

import (
	"fmt"
	"log"

	"github.com/c0deZ3R0/go-sync-kit/version"
)

func main() {
	// Create a new vector clock
	clock := version.NewVectorClock()
	fmt.Printf("New clock: %s\n", clock.String())

	// Increment when a node creates an event
	clock.Increment("node-1")
	fmt.Printf("After increment: %s\n", clock.String())

	// Create another clock from JSON
	otherClock, err := version.NewVectorClockFromString(`{"node-2": 3, "node-1": 1}`)
	if err != nil {
		log.Fatal(err)
	}

	// Compare clocks to determine causality
	relationship := clock.Compare(otherClock)
	switch relationship {
	case -1:
		fmt.Println("clock happened-before otherClock")
	case 1:
		fmt.Println("clock happened-after otherClock")
	case 0:
		fmt.Println("clocks are concurrent or equal")
	}

}
Output:

New clock: {}
After increment: {"node-1":1}
clock happened-before otherClock
Example (ConflictDetection)

Example_conflictDetection shows how vector clocks can detect conflicts in a distributed system scenario.

package main

import (
	"fmt"

	"github.com/c0deZ3R0/go-sync-kit/version"
)

func main() {
	// Simulate two users editing the same document offline
	fmt.Println("=== Two users start with the same document version ===")
	userAlice, _ := version.NewVectorClockFromString(`{"server":5}`)
	userBob, _ := version.NewVectorClockFromString(`{"server":5}`)

	fmt.Printf("Alice's version: %s\n", userAlice.String())
	fmt.Printf("Bob's version: %s\n", userBob.String())

	fmt.Println("\n=== Both users make changes offline ===")
	userAlice.Increment("alice")
	userBob.Increment("bob")

	fmt.Printf("Alice after edit: %s\n", userAlice.String())
	fmt.Printf("Bob after edit: %s\n", userBob.String())

	fmt.Println("\n=== Detect conflict when they try to sync ===")
	if userAlice.IsConcurrentWith(userBob) {
		fmt.Println("⚠️  CONFLICT DETECTED: Both users made concurrent changes!")
		fmt.Println("Need conflict resolution strategy.")
	} else if userAlice.HappenedAfter(userBob) {
		fmt.Println("Alice's changes can be applied (happened after Bob's)")
	} else if userBob.HappenedAfter(userAlice) {
		fmt.Println("Bob's changes can be applied (happened after Alice's)")
	}

	fmt.Println("\n=== After conflict resolution (merge both changes) ===")
	// Simulate conflict resolution by merging both versions
	resolved := userAlice.Clone()
	resolved.Merge(userBob)
	resolved.Increment("server") // Server creates merge event

	fmt.Printf("Resolved version: %s\n", resolved.String())
	fmt.Printf("Resolved happened after Alice: %t\n", resolved.HappenedAfter(userAlice))
	fmt.Printf("Resolved happened after Bob: %t\n", resolved.HappenedAfter(userBob))

}
Output:

=== Two users start with the same document version ===
Alice's version: {"server":5}
Bob's version: {"server":5}

=== Both users make changes offline ===
Alice after edit: {"alice":1,"server":5}
Bob after edit: {"bob":1,"server":5}

=== Detect conflict when they try to sync ===
⚠️  CONFLICT DETECTED: Both users made concurrent changes!
Need conflict resolution strategy.

=== After conflict resolution (merge both changes) ===
Resolved version: {"alice":1,"bob":1,"server":6}
Resolved happened after Alice: true
Resolved happened after Bob: true
Example (DistributedScenario)

Example_distributedScenario demonstrates how vector clocks work in a distributed system with multiple nodes.

package main

import (
	"fmt"

	"github.com/c0deZ3R0/go-sync-kit/version"
)

func main() {
	// Three nodes in a distributed system
	nodeA := version.NewVectorClock()
	nodeB := version.NewVectorClock()
	nodeC := version.NewVectorClock()

	fmt.Println("=== Initial State ===")
	fmt.Printf("Node A: %s\n", nodeA.String())
	fmt.Printf("Node B: %s\n", nodeB.String())
	fmt.Printf("Node C: %s\n", nodeC.String())

	fmt.Println("\n=== Node A creates an event ===")
	nodeA.Increment("A")
	fmt.Printf("Node A: %s\n", nodeA.String())

	fmt.Println("\n=== Node B creates an event independently ===")
	nodeB.Increment("B")
	fmt.Printf("Node B: %s\n", nodeB.String())
	fmt.Printf("A concurrent with B: %t\n", nodeA.IsConcurrentWith(nodeB))

	fmt.Println("\n=== Node B receives A's state and creates another event ===")
	nodeB.Merge(nodeA)
	nodeB.Increment("B")
	fmt.Printf("Node B after merge and increment: %s\n", nodeB.String())
	fmt.Printf("B happened after A: %t\n", nodeB.HappenedAfter(nodeA))

	fmt.Println("\n=== Node C joins and creates an event ===")
	nodeC.Increment("C")
	fmt.Printf("Node C: %s\n", nodeC.String())
	fmt.Printf("C concurrent with A: %t\n", nodeC.IsConcurrentWith(nodeA))
	fmt.Printf("C concurrent with B: %t\n", nodeC.IsConcurrentWith(nodeB))

	fmt.Println("\n=== Node C syncs with A and B ===")
	nodeC.Merge(nodeA)
	nodeC.Merge(nodeB)
	nodeC.Increment("C")
	fmt.Printf("Node C after sync: %s\n", nodeC.String())
	fmt.Printf("C happened after A: %t\n", nodeC.HappenedAfter(nodeA))
	fmt.Printf("C happened after B: %t\n", nodeC.HappenedAfter(nodeB))

}
Output:

=== Initial State ===
Node A: {}
Node B: {}
Node C: {}

=== Node A creates an event ===
Node A: {"A":1}

=== Node B creates an event independently ===
Node B: {"B":1}
A concurrent with B: true

=== Node B receives A's state and creates another event ===
Node B after merge and increment: {"A":1,"B":2}
B happened after A: true

=== Node C joins and creates an event ===
Node C: {"C":1}
C concurrent with A: true
C concurrent with B: true

=== Node C syncs with A and B ===
Node C after sync: {"A":1,"B":2,"C":2}
C happened after A: true
C happened after B: true
Example (MapConstruction)

Example_mapConstruction shows how to create vector clocks from maps, useful for testing or when you have clock data in map format.

package main

import (
	"fmt"

	"github.com/c0deZ3R0/go-sync-kit/version"
)

func main() {
	// Create from a map (useful for testing)
	clockData := map[string]uint64{
		"node-1": 5,
		"node-2": 3,
		"node-3": 1,
	}

	clock := version.NewVectorClockFromMap(clockData)
	fmt.Printf("Clock from map: %s\n", clock.String())
	fmt.Printf("Size: %d nodes\n", clock.Size())
	fmt.Printf("Node-2 clock value: %d\n", clock.GetClock("node-2"))

	// Get all clocks as a map (returns a copy)
	allClocks := clock.GetAllClocks()
	fmt.Printf("All clocks: %+v\n", allClocks)

	// Modifying the returned map doesn't affect the original
	allClocks["node-1"] = 100
	fmt.Printf("After modifying returned map, original node-1: %d\n", clock.GetClock("node-1"))

}
Output:

Clock from map: {"node-1":5,"node-2":3,"node-3":1}
Size: 3 nodes
Node-2 clock value: 3
All clocks: map[node-1:5 node-2:3 node-3:1]
After modifying returned map, original node-1: 5
Example (Serialization)

Example_serialization demonstrates how to serialize and deserialize vector clocks for storage or network transmission.

package main

import (
	"fmt"
	"log"

	"github.com/c0deZ3R0/go-sync-kit/version"
)

func main() {
	// Create and populate a vector clock
	clock := version.NewVectorClock()
	clock.Increment("server-1")
	clock.Increment("server-2")
	clock.Increment("server-1") // server-1 creates another event

	fmt.Printf("Original clock: %s\n", clock.String())

	// Serialize to JSON string (for storage/transmission)
	jsonStr := clock.String()
	fmt.Printf("Serialized: %s\n", jsonStr)

	// Deserialize from JSON string
	restored, err := version.NewVectorClockFromString(jsonStr)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Printf("Restored clock: %s\n", restored.String())
	fmt.Printf("Are they equal? %t\n", clock.IsEqual(restored))

}
Output:

Original clock: {"server-1":2,"server-2":1}
Serialized: {"server-1":2,"server-2":1}
Restored clock: {"server-1":2,"server-2":1}
Are they equal? true

Index

Examples

Constants

View Source
const (
	// MaxNodeIDLength is the maximum allowed length for a node ID
	MaxNodeIDLength = 255

	// MaxNodes is the maximum number of nodes that can be tracked
	// This prevents memory issues from unbounded growth
	MaxNodes = 1000
)

Vector clock constraints

Variables

This section is empty.

Functions

This section is empty.

Types

type VectorClock

type VectorClock struct {
	// contains filtered or unexported fields
}

VectorClock implements the Version interface using a map of node IDs to logical clocks. It is used to determine the partial ordering of events in a distributed system. A vector clock can determine if one version happened-before, happened-after, or is concurrent with another version.

Vector clocks are ideal for: - Offline-first applications - Multi-master replication - Conflict detection in distributed systems - Event sourcing with multiple writers

func NewVectorClock

func NewVectorClock() *VectorClock

NewVectorClock creates an empty VectorClock. This is the primary constructor for creating vector clocks.

func NewVectorClockFromMap

func NewVectorClockFromMap(clocks map[string]uint64) *VectorClock

NewVectorClockFromMap creates a VectorClock from a map of node IDs to clock values. This is useful for testing or when you have clock data in map format. The input map is copied to prevent external mutations.

func NewVectorClockFromString

func NewVectorClockFromString(data string) (*VectorClock, error)

NewVectorClockFromString attempts to deserialize a JSON string into a VectorClock. This is useful for reconstructing a version received from storage or over the network.

The expected format is a JSON object mapping node IDs to clock values: {"node-1": 5, "node-2": 3}

Returns an error if the input string is not valid JSON or contains invalid data.

func (*VectorClock) Clone

func (vc *VectorClock) Clone() *VectorClock

Clone creates a deep copy of the VectorClock. This is useful when you need to create a snapshot or avoid mutations.

func (*VectorClock) Compare

func (vc *VectorClock) Compare(other synckit.Version) int

Compare determines the causal relationship between two VectorClocks. It returns:

  • -1: if this VectorClock happened-before the other (this ≺ other)
  • 1: if this VectorClock happened-after the other (this ≻ other)
  • 0: if they are concurrent or identical (this || other)

The comparison follows the standard vector clock partial ordering: - A ≺ B if A[i] ≤ B[i] for all i, and A[j] < B[j] for at least one j - A ≻ B if B ≺ A - A || B if neither A ≺ B nor B ≺ A (concurrent)

func (*VectorClock) GetAllClocks

func (vc *VectorClock) GetAllClocks() map[string]uint64

GetAllClocks returns a copy of the internal clock map. This prevents external mutation of the vector clock's internal state.

func (*VectorClock) GetClock

func (vc *VectorClock) GetClock(nodeID string) uint64

GetClock returns the clock value for a specific node ID. Returns 0 if the node ID has not been observed in this vector clock.

func (*VectorClock) HappenedAfter

func (vc *VectorClock) HappenedAfter(other *VectorClock) bool

HappenedAfter returns true if this vector clock happened-after the other.

func (*VectorClock) HappenedBefore

func (vc *VectorClock) HappenedBefore(other *VectorClock) bool

HappenedBefore returns true if this vector clock happened-before the other.

func (*VectorClock) Increment

func (vc *VectorClock) Increment(nodeID string) error

Increment increases the logical clock for a given node ID. This should be called whenever a node generates a new event.

Example usage:

clock := NewVectorClock()
clock.Increment("node-1") // {"node-1": 1}
clock.Increment("node-1") // {"node-1": 2}

Returns an error if: - The node ID is empty - The node ID exceeds MaxNodeIDLength - Adding this node would exceed MaxNodes

func (*VectorClock) IsConcurrentWith

func (vc *VectorClock) IsConcurrentWith(other *VectorClock) bool

IsConcurrentWith returns true if this vector clock is concurrent with another. Two vector clocks are concurrent if neither happened-before the other.

func (*VectorClock) IsEqual

func (vc *VectorClock) IsEqual(other *VectorClock) bool

IsEqual returns true if two vector clocks are identical.

func (*VectorClock) IsZero

func (vc *VectorClock) IsZero() bool

IsZero returns true if the VectorClock is empty (no nodes have been observed). This is equivalent to the initial state of a vector clock.

func (*VectorClock) Merge

func (vc *VectorClock) Merge(other *VectorClock) error

Merge combines this vector clock with another, taking the maximum clock value for each node present in either clock. This is essential for maintaining causal history when a node receives events from another node.

The merge operation ensures that the resulting clock incorporates all the causal history that both clocks have observed.

Example:

clock1 := {"node-1": 2, "node-2": 1}
clock2 := {"node-1": 1, "node-3": 2}
clock1.Merge(clock2) results in {"node-1": 2, "node-2": 1, "node-3": 2}

Returns an error if: - The resulting merged clock would exceed MaxNodes - Any node ID in the other clock exceeds MaxNodeIDLength

func (*VectorClock) Size

func (vc *VectorClock) Size() int

Size returns the number of nodes tracked by this vector clock.

func (*VectorClock) String

func (vc *VectorClock) String() string

String serializes the VectorClock to a JSON string for storage or transport. The format is a JSON object mapping node IDs to their clock values.

Examples:

  • Empty clock: "{}"
  • Single node: {"node-1":5}
  • Multiple nodes: {"node-1":5,"node-2":3}

type VectorClockError added in v0.5.0

type VectorClockError struct {
	Msg string
}

VectorClockError represents errors that can occur during vector clock operations

func (*VectorClockError) Error added in v0.5.0

func (e *VectorClockError) Error() string

type VectorClockManager

type VectorClockManager struct {
	// contains filtered or unexported fields
}

VectorClockManager implements VersionManager for vector clock versioning.

func NewVectorClockManager

func NewVectorClockManager() *VectorClockManager

NewVectorClockManager creates a new vector clock version manager.

func NewVectorClockManagerFromVersion

func NewVectorClockManagerFromVersion(version synckit.Version) (*VectorClockManager, error)

NewVectorClockManagerFromVersion creates a vector clock manager from an existing version.

func (*VectorClockManager) Clone

func (vm *VectorClockManager) Clone() VersionManager

Clone creates a copy of the version manager.

func (*VectorClockManager) CurrentVersion

func (vm *VectorClockManager) CurrentVersion() synckit.Version

CurrentVersion returns the current vector clock state.

func (*VectorClockManager) NextVersion

func (vm *VectorClockManager) NextVersion(nodeID string) synckit.Version

NextVersion increments the clock for the given node and returns the new version.

func (*VectorClockManager) UpdateFromVersion

func (vm *VectorClockManager) UpdateFromVersion(version synckit.Version) error

UpdateFromVersion merges the observed version into the current state.

type VersionManager

type VersionManager interface {
	// CurrentVersion returns the current version state
	CurrentVersion() synckit.Version

	// NextVersion generates the next version for a new event
	// The nodeID parameter allows node-specific versioning (e.g., for vector clocks)
	NextVersion(nodeID string) synckit.Version

	// UpdateFromVersion updates the internal state based on an observed version
	// This is used when loading events from the store or receiving from peers
	UpdateFromVersion(version synckit.Version) error

	// Clone creates a copy of the version manager
	Clone() VersionManager
}

VersionManager defines the interface for managing version state. This allows different versioning strategies to be plugged in.

type VersionedStore

type VersionedStore struct {
	// contains filtered or unexported fields
}

VersionedStore is a decorator for an EventStore that manages versioning automatically. It uses a pluggable VersionManager to handle different versioning strategies.

func NewVersionedStore

func NewVersionedStore(store synckit.EventStore, nodeID string, versionManager VersionManager) (*VersionedStore, error)

NewVersionedStore creates a new versioned store decorator. It automatically initializes the version manager from the store's latest version.

func (*VersionedStore) Close

func (s *VersionedStore) Close() error

Close delegates to the underlying store.

func (*VersionedStore) GetVersionManager

func (s *VersionedStore) GetVersionManager() VersionManager

GetVersionManager returns the current version manager (useful for testing or advanced use cases).

func (*VersionedStore) LatestVersion

func (s *VersionedStore) LatestVersion(ctx context.Context) (synckit.Version, error)

LatestVersion returns the current version from the version manager.

func (*VersionedStore) Load

Load passes through to the underlying store and updates version manager state.

func (*VersionedStore) LoadByAggregate

func (s *VersionedStore) LoadByAggregate(ctx context.Context, aggregateID string, since synckit.Version) ([]synckit.EventWithVersion, error)

LoadByAggregate passes through to the underlying store and updates version manager state.

func (*VersionedStore) ParseVersion

func (s *VersionedStore) ParseVersion(ctx context.Context, versionStr string) (synckit.Version, error)

ParseVersion delegates to the underlying store.

func (*VersionedStore) SetNodeID

func (s *VersionedStore) SetNodeID(nodeID string)

SetNodeID updates the node ID for version generation.

func (*VersionedStore) Store

func (s *VersionedStore) Store(ctx context.Context, event synckit.Event, version synckit.Version) error

Store generates the next version and stores the event with that version.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL