Documentation
¶
Overview ¶
Package version provides various version implementations for the synckit library.
This package contains implementations of the sync.Version interface, which is used to track causality and ordering of events in distributed systems.
Vector Clocks ¶
The primary implementation is VectorClock, which provides robust causality tracking for distributed systems. Vector clocks can determine if one version happened-before, happened-after, or is concurrent with another version.
Basic Usage ¶
import "github.com/c0deZ3R0/go-sync-kit/version"
// Create a new vector clock
clock := version.NewVectorClock()
// Increment when a node creates an event
clock.Increment("node-1")
// Merge when receiving events from another node
otherClock := version.NewVectorClockFromString(`{"node-2": 3}`)
clock.Merge(otherClock)
// Compare clocks to determine causality
relationship := clock.Compare(otherClock)
switch relationship {
case -1:
fmt.Println("clock happened-before otherClock")
case 1:
fmt.Println("clock happened-after otherClock")
case 0:
fmt.Println("clocks are concurrent or equal")
}
Distributed System Example ¶
Vector clocks are particularly useful in distributed systems where nodes may be offline and need to sync later:
// Three nodes in a distributed system
nodeA := version.NewVectorClock()
nodeB := version.NewVectorClock()
nodeC := version.NewVectorClock()
// Node A creates an event
nodeA.Increment("A")
// Node B creates an event independently (concurrent)
nodeB.Increment("B")
// Node A and B are concurrent at this point
fmt.Println("A concurrent with B:", nodeA.IsConcurrentWith(nodeB))
// Node B receives A's state and creates another event
nodeB.Merge(nodeA)
nodeB.Increment("B")
// Now B happened-after A
fmt.Println("B happened after A:", nodeB.HappenedAfter(nodeA))
Serialization ¶
Vector clocks can be serialized to JSON for storage or network transmission:
clock := version.NewVectorClock()
clock.Increment("node-1")
clock.Increment("node-2")
// Serialize to JSON string
jsonStr := clock.String() // {"node-1":1,"node-2":1}
// Deserialize from JSON string
restored, err := version.NewVectorClockFromString(jsonStr)
if err != nil {
log.Fatal(err)
}
Integration with go-sync-kit ¶
Vector clocks implement the sync.Version interface and can be used with any EventStore or Transport implementation:
import (
"github.com/c0deZ3R0/go-sync-kit"
"github.com/c0deZ3R0/go-sync-kit/version"
"github.com/c0deZ3R0/go-sync-kit/storage/sqlite"
)
// Create a vector clock version
currentVersion := version.NewVectorClock()
currentVersion.Increment("my-node-id")
// Use with EventStore
store, _ := sqlite.NewWithDataSource("events.db")
err := store.Store(ctx, event, currentVersion)
// The EventStore will handle version parsing automatically
events, err := store.Load(ctx, currentVersion)
Performance Considerations ¶
Vector clocks grow with the number of nodes in the system. In systems with many nodes, consider:
- Using node ID prefixes to group related nodes - Implementing periodic cleanup of inactive nodes - Monitoring vector clock size in production
The implementation is optimized for typical distributed system usage with reasonable numbers of nodes (dozens to hundreds).
Package version provides various version implementations for the go-sync-kit library. Vector clocks are particularly useful for tracking causality in distributed systems.
Example (Basic) ¶
Example_basic demonstrates basic vector clock operations.
package main
import (
"fmt"
"log"
"github.com/c0deZ3R0/go-sync-kit/version"
)
func main() {
// Create a new vector clock
clock := version.NewVectorClock()
fmt.Printf("New clock: %s\n", clock.String())
// Increment when a node creates an event
clock.Increment("node-1")
fmt.Printf("After increment: %s\n", clock.String())
// Create another clock from JSON
otherClock, err := version.NewVectorClockFromString(`{"node-2": 3, "node-1": 1}`)
if err != nil {
log.Fatal(err)
}
// Compare clocks to determine causality
relationship := clock.Compare(otherClock)
switch relationship {
case -1:
fmt.Println("clock happened-before otherClock")
case 1:
fmt.Println("clock happened-after otherClock")
case 0:
fmt.Println("clocks are concurrent or equal")
}
}
Output: New clock: {} After increment: {"node-1":1} clock happened-before otherClock
Example (ConflictDetection) ¶
Example_conflictDetection shows how vector clocks can detect conflicts in a distributed system scenario.
package main
import (
"fmt"
"github.com/c0deZ3R0/go-sync-kit/version"
)
func main() {
// Simulate two users editing the same document offline
fmt.Println("=== Two users start with the same document version ===")
userAlice, _ := version.NewVectorClockFromString(`{"server":5}`)
userBob, _ := version.NewVectorClockFromString(`{"server":5}`)
fmt.Printf("Alice's version: %s\n", userAlice.String())
fmt.Printf("Bob's version: %s\n", userBob.String())
fmt.Println("\n=== Both users make changes offline ===")
userAlice.Increment("alice")
userBob.Increment("bob")
fmt.Printf("Alice after edit: %s\n", userAlice.String())
fmt.Printf("Bob after edit: %s\n", userBob.String())
fmt.Println("\n=== Detect conflict when they try to sync ===")
if userAlice.IsConcurrentWith(userBob) {
fmt.Println("⚠️ CONFLICT DETECTED: Both users made concurrent changes!")
fmt.Println("Need conflict resolution strategy.")
} else if userAlice.HappenedAfter(userBob) {
fmt.Println("Alice's changes can be applied (happened after Bob's)")
} else if userBob.HappenedAfter(userAlice) {
fmt.Println("Bob's changes can be applied (happened after Alice's)")
}
fmt.Println("\n=== After conflict resolution (merge both changes) ===")
// Simulate conflict resolution by merging both versions
resolved := userAlice.Clone()
resolved.Merge(userBob)
resolved.Increment("server") // Server creates merge event
fmt.Printf("Resolved version: %s\n", resolved.String())
fmt.Printf("Resolved happened after Alice: %t\n", resolved.HappenedAfter(userAlice))
fmt.Printf("Resolved happened after Bob: %t\n", resolved.HappenedAfter(userBob))
}
Output: === Two users start with the same document version === Alice's version: {"server":5} Bob's version: {"server":5} === Both users make changes offline === Alice after edit: {"alice":1,"server":5} Bob after edit: {"bob":1,"server":5} === Detect conflict when they try to sync === ⚠️ CONFLICT DETECTED: Both users made concurrent changes! Need conflict resolution strategy. === After conflict resolution (merge both changes) === Resolved version: {"alice":1,"bob":1,"server":6} Resolved happened after Alice: true Resolved happened after Bob: true
Example (DistributedScenario) ¶
Example_distributedScenario demonstrates how vector clocks work in a distributed system with multiple nodes.
package main
import (
"fmt"
"github.com/c0deZ3R0/go-sync-kit/version"
)
func main() {
// Three nodes in a distributed system
nodeA := version.NewVectorClock()
nodeB := version.NewVectorClock()
nodeC := version.NewVectorClock()
fmt.Println("=== Initial State ===")
fmt.Printf("Node A: %s\n", nodeA.String())
fmt.Printf("Node B: %s\n", nodeB.String())
fmt.Printf("Node C: %s\n", nodeC.String())
fmt.Println("\n=== Node A creates an event ===")
nodeA.Increment("A")
fmt.Printf("Node A: %s\n", nodeA.String())
fmt.Println("\n=== Node B creates an event independently ===")
nodeB.Increment("B")
fmt.Printf("Node B: %s\n", nodeB.String())
fmt.Printf("A concurrent with B: %t\n", nodeA.IsConcurrentWith(nodeB))
fmt.Println("\n=== Node B receives A's state and creates another event ===")
nodeB.Merge(nodeA)
nodeB.Increment("B")
fmt.Printf("Node B after merge and increment: %s\n", nodeB.String())
fmt.Printf("B happened after A: %t\n", nodeB.HappenedAfter(nodeA))
fmt.Println("\n=== Node C joins and creates an event ===")
nodeC.Increment("C")
fmt.Printf("Node C: %s\n", nodeC.String())
fmt.Printf("C concurrent with A: %t\n", nodeC.IsConcurrentWith(nodeA))
fmt.Printf("C concurrent with B: %t\n", nodeC.IsConcurrentWith(nodeB))
fmt.Println("\n=== Node C syncs with A and B ===")
nodeC.Merge(nodeA)
nodeC.Merge(nodeB)
nodeC.Increment("C")
fmt.Printf("Node C after sync: %s\n", nodeC.String())
fmt.Printf("C happened after A: %t\n", nodeC.HappenedAfter(nodeA))
fmt.Printf("C happened after B: %t\n", nodeC.HappenedAfter(nodeB))
}
Output: === Initial State === Node A: {} Node B: {} Node C: {} === Node A creates an event === Node A: {"A":1} === Node B creates an event independently === Node B: {"B":1} A concurrent with B: true === Node B receives A's state and creates another event === Node B after merge and increment: {"A":1,"B":2} B happened after A: true === Node C joins and creates an event === Node C: {"C":1} C concurrent with A: true C concurrent with B: true === Node C syncs with A and B === Node C after sync: {"A":1,"B":2,"C":2} C happened after A: true C happened after B: true
Example (MapConstruction) ¶
Example_mapConstruction shows how to create vector clocks from maps, useful for testing or when you have clock data in map format.
package main
import (
"fmt"
"github.com/c0deZ3R0/go-sync-kit/version"
)
func main() {
// Create from a map (useful for testing)
clockData := map[string]uint64{
"node-1": 5,
"node-2": 3,
"node-3": 1,
}
clock := version.NewVectorClockFromMap(clockData)
fmt.Printf("Clock from map: %s\n", clock.String())
fmt.Printf("Size: %d nodes\n", clock.Size())
fmt.Printf("Node-2 clock value: %d\n", clock.GetClock("node-2"))
// Get all clocks as a map (returns a copy)
allClocks := clock.GetAllClocks()
fmt.Printf("All clocks: %+v\n", allClocks)
// Modifying the returned map doesn't affect the original
allClocks["node-1"] = 100
fmt.Printf("After modifying returned map, original node-1: %d\n", clock.GetClock("node-1"))
}
Output: Clock from map: {"node-1":5,"node-2":3,"node-3":1} Size: 3 nodes Node-2 clock value: 3 All clocks: map[node-1:5 node-2:3 node-3:1] After modifying returned map, original node-1: 5
Example (Serialization) ¶
Example_serialization demonstrates how to serialize and deserialize vector clocks for storage or network transmission.
package main
import (
"fmt"
"log"
"github.com/c0deZ3R0/go-sync-kit/version"
)
func main() {
// Create and populate a vector clock
clock := version.NewVectorClock()
clock.Increment("server-1")
clock.Increment("server-2")
clock.Increment("server-1") // server-1 creates another event
fmt.Printf("Original clock: %s\n", clock.String())
// Serialize to JSON string (for storage/transmission)
jsonStr := clock.String()
fmt.Printf("Serialized: %s\n", jsonStr)
// Deserialize from JSON string
restored, err := version.NewVectorClockFromString(jsonStr)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Restored clock: %s\n", restored.String())
fmt.Printf("Are they equal? %t\n", clock.IsEqual(restored))
}
Output: Original clock: {"server-1":2,"server-2":1} Serialized: {"server-1":2,"server-2":1} Restored clock: {"server-1":2,"server-2":1} Are they equal? true
Index ¶
- Constants
- Variables
- type VectorClock
- func (vc *VectorClock) Clone() *VectorClock
- func (vc *VectorClock) Compare(other types.Version) int
- func (vc *VectorClock) GetAllClocks() map[string]uint64
- func (vc *VectorClock) GetClock(nodeID string) uint64
- func (vc *VectorClock) HappenedAfter(other *VectorClock) bool
- func (vc *VectorClock) HappenedBefore(other *VectorClock) bool
- func (vc *VectorClock) Increment(nodeID string) error
- func (vc *VectorClock) IsConcurrentWith(other *VectorClock) bool
- func (vc *VectorClock) IsEqual(other *VectorClock) bool
- func (vc *VectorClock) IsZero() bool
- func (vc *VectorClock) Merge(other *VectorClock) error
- func (vc *VectorClock) Size() int
- func (vc *VectorClock) String() string
- type VectorClockError
- type VectorClockManager
- type VersionManager
- type VersionedStore
- func (s *VersionedStore) Close() error
- func (s *VersionedStore) GetVersionManager() VersionManager
- func (s *VersionedStore) LatestVersion(ctx context.Context) (synckit.Version, error)
- func (s *VersionedStore) Load(ctx context.Context, since synckit.Version) ([]synckit.EventWithVersion, error)
- func (s *VersionedStore) LoadByAggregate(ctx context.Context, aggregateID string, since synckit.Version) ([]synckit.EventWithVersion, error)
- func (s *VersionedStore) ParseVersion(ctx context.Context, versionStr string) (synckit.Version, error)
- func (s *VersionedStore) SetNodeID(nodeID string)
- func (s *VersionedStore) Store(ctx context.Context, event synckit.Event, version types.Version) error
Examples ¶
Constants ¶
const ( // MaxNodeIDLength is the maximum allowed length for a node ID MaxNodeIDLength = 255 // MaxNodes is the maximum number of nodes that can be tracked // This prevents memory issues from unbounded growth MaxNodes = 1000 )
Vector clock constraints
Variables ¶
var ( ErrIncompatibleVersion = fmt.Errorf("incompatible version type") ErrStoreClosed = fmt.Errorf("store is closed") )
Error types
Functions ¶
This section is empty.
Types ¶
type VectorClock ¶
type VectorClock struct {
// contains filtered or unexported fields
}
VectorClock implements the Version interface using a map of node IDs to logical clocks. It is used to determine the partial ordering of events in a distributed system. A vector clock can determine if one version happened-before, happened-after, or is concurrent with another version.
Vector clocks are ideal for: - Offline-first applications - Multi-master replication - Conflict detection in distributed systems - Event sourcing with multiple writers
func NewVectorClock ¶
func NewVectorClock() *VectorClock
NewVectorClock creates an empty VectorClock. This is the primary constructor for creating vector clocks.
func NewVectorClockFromMap ¶
func NewVectorClockFromMap(clocks map[string]uint64) *VectorClock
NewVectorClockFromMap creates a VectorClock from a map of node IDs to clock values. This is useful for testing or when you have clock data in map format. The input map is copied to prevent external mutations.
func NewVectorClockFromString ¶
func NewVectorClockFromString(data string) (*VectorClock, error)
NewVectorClockFromString attempts to deserialize a JSON string into a VectorClock. This is useful for reconstructing a version received from storage or over the network.
The expected format is a JSON object mapping node IDs to clock values: {"node-1": 5, "node-2": 3}
Returns an error if the input string is not valid JSON or contains invalid data.
func (*VectorClock) Clone ¶
func (vc *VectorClock) Clone() *VectorClock
Clone creates a deep copy of the VectorClock. This is useful when you need to create a snapshot or avoid mutations.
func (*VectorClock) Compare ¶
func (vc *VectorClock) Compare(other types.Version) int
Compare determines the causal relationship between two VectorClocks. It returns:
- -1: if this VectorClock happened-before the other (this ≺ other)
- 1: if this VectorClock happened-after the other (this ≻ other)
- 0: if they are concurrent or identical (this || other)
The comparison follows the standard vector clock partial ordering: - A ≺ B if A[i] ≤ B[i] for all i, and A[j] < B[j] for at least one j - A ≻ B if B ≺ A - A || B if neither A ≺ B nor B ≺ A (concurrent)
func (*VectorClock) GetAllClocks ¶
func (vc *VectorClock) GetAllClocks() map[string]uint64
GetAllClocks returns a copy of the internal clock map. This prevents external mutation of the vector clock's internal state.
func (*VectorClock) GetClock ¶
func (vc *VectorClock) GetClock(nodeID string) uint64
GetClock returns the clock value for a specific node ID. Returns 0 if the node ID has not been observed in this vector clock.
func (*VectorClock) HappenedAfter ¶
func (vc *VectorClock) HappenedAfter(other *VectorClock) bool
HappenedAfter returns true if this vector clock happened-after the other.
func (*VectorClock) HappenedBefore ¶
func (vc *VectorClock) HappenedBefore(other *VectorClock) bool
HappenedBefore returns true if this vector clock happened-before the other.
func (*VectorClock) Increment ¶
func (vc *VectorClock) Increment(nodeID string) error
Increment increases the logical clock for a given node ID. This should be called whenever a node generates a new event.
Example usage:
clock := NewVectorClock()
clock.Increment("node-1") // {"node-1": 1}
clock.Increment("node-1") // {"node-1": 2}
Returns an error if: - The node ID is empty - The node ID exceeds MaxNodeIDLength - Adding this node would exceed MaxNodes
func (*VectorClock) IsConcurrentWith ¶
func (vc *VectorClock) IsConcurrentWith(other *VectorClock) bool
IsConcurrentWith returns true if this vector clock is concurrent with another. Two vector clocks are concurrent if neither happened-before the other.
func (*VectorClock) IsEqual ¶
func (vc *VectorClock) IsEqual(other *VectorClock) bool
IsEqual returns true if two vector clocks are identical.
func (*VectorClock) IsZero ¶
func (vc *VectorClock) IsZero() bool
IsZero returns true if the VectorClock is empty (no nodes have been observed). This is equivalent to the initial state of a vector clock.
func (*VectorClock) Merge ¶
func (vc *VectorClock) Merge(other *VectorClock) error
Merge combines this vector clock with another, taking the maximum clock value for each node present in either clock. This is essential for maintaining causal history when a node receives events from another node.
The merge operation ensures that the resulting clock incorporates all the causal history that both clocks have observed.
Example:
clock1 := {"node-1": 2, "node-2": 1}
clock2 := {"node-1": 1, "node-3": 2}
clock1.Merge(clock2) results in {"node-1": 2, "node-2": 1, "node-3": 2}
Returns an error if: - The resulting merged clock would exceed MaxNodes - Any node ID in the other clock exceeds MaxNodeIDLength
func (*VectorClock) Size ¶
func (vc *VectorClock) Size() int
Size returns the number of nodes tracked by this vector clock.
func (*VectorClock) String ¶
func (vc *VectorClock) String() string
String serializes the VectorClock to a JSON string for storage or transport. The format is a JSON object mapping node IDs to their clock values.
Examples:
- Empty clock: "{}"
- Single node: {"node-1":5}
- Multiple nodes: {"node-1":5,"node-2":3}
type VectorClockError ¶ added in v0.5.0
type VectorClockError struct {
Msg string
}
VectorClockError represents errors that can occur during vector clock operations
func (*VectorClockError) Error ¶ added in v0.5.0
func (e *VectorClockError) Error() string
type VectorClockManager ¶
type VectorClockManager struct {
// contains filtered or unexported fields
}
VectorClockManager implements VersionManager for vector clock versioning.
func NewVectorClockManager ¶
func NewVectorClockManager() *VectorClockManager
NewVectorClockManager creates a new vector clock version manager.
func NewVectorClockManagerFromVersion ¶
func NewVectorClockManagerFromVersion(version types.Version) (*VectorClockManager, error)
NewVectorClockManagerFromVersion creates a vector clock manager from an existing version.
func (*VectorClockManager) Clone ¶
func (vm *VectorClockManager) Clone() VersionManager
Clone creates a copy of the version manager.
func (*VectorClockManager) CurrentVersion ¶
func (vm *VectorClockManager) CurrentVersion() types.Version
CurrentVersion returns the current vector clock state.
func (*VectorClockManager) NextVersion ¶
func (vm *VectorClockManager) NextVersion(nodeID string) types.Version
NextVersion increments the clock for the given node and returns the new version.
func (*VectorClockManager) UpdateFromVersion ¶
func (vm *VectorClockManager) UpdateFromVersion(version types.Version) error
UpdateFromVersion merges the observed version into the current state.
type VersionManager ¶
type VersionManager interface {
// CurrentVersion returns the current version state
CurrentVersion() types.Version
// NextVersion generates the next version for a new event
// The nodeID parameter allows node-specific versioning (e.g., for vector clocks)
NextVersion(nodeID string) types.Version
// UpdateFromVersion updates the internal state based on an observed version
// This is used when loading events from the store or receiving from peers
UpdateFromVersion(version types.Version) error
// Clone creates a copy of the version manager
Clone() VersionManager
}
VersionManager defines the interface for managing version state. This allows different versioning strategies to be plugged in.
type VersionedStore ¶
type VersionedStore struct {
// contains filtered or unexported fields
}
VersionedStore is a decorator for an EventStore that manages versioning automatically. It uses a pluggable VersionManager to handle different versioning strategies.
func NewVersionedStore ¶
func NewVersionedStore(store synckit.EventStore, nodeID string, versionManager VersionManager) (*VersionedStore, error)
NewVersionedStore creates a new versioned store decorator. It automatically initializes the version manager from the store's latest version.
func NewVersionedStoreWithLogger ¶ added in v0.12.0
func NewVersionedStoreWithLogger(store synckit.EventStore, nodeID string, versionManager VersionManager, logger *slog.Logger) (*VersionedStore, error)
NewVersionedStoreWithLogger creates a new versioned store decorator with a custom logger. It automatically initializes the version manager from the store's latest version.
func (*VersionedStore) Close ¶
func (s *VersionedStore) Close() error
Close delegates to the underlying store.
func (*VersionedStore) GetVersionManager ¶
func (s *VersionedStore) GetVersionManager() VersionManager
GetVersionManager returns the current version manager (useful for testing or advanced use cases).
func (*VersionedStore) LatestVersion ¶
LatestVersion returns the highest version number in the store.
func (*VersionedStore) Load ¶
func (s *VersionedStore) Load(ctx context.Context, since synckit.Version) ([]synckit.EventWithVersion, error)
Load passes through to the underlying store and updates version manager state.
func (*VersionedStore) LoadByAggregate ¶
func (s *VersionedStore) LoadByAggregate(ctx context.Context, aggregateID string, since synckit.Version) ([]synckit.EventWithVersion, error)
LoadByAggregate passes through to the underlying store and updates version manager state.
func (*VersionedStore) ParseVersion ¶
func (s *VersionedStore) ParseVersion(ctx context.Context, versionStr string) (synckit.Version, error)
ParseVersion delegates to the underlying store.
func (*VersionedStore) SetNodeID ¶
func (s *VersionedStore) SetNodeID(nodeID string)
SetNodeID updates the node ID for version generation.