sync

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 3, 2025 License: MIT Imports: 6 Imported by: 0

README

Go Sync Kit

A generic, event-driven synchronization library for distributed Go applications. Go Sync Kit enables offline-first architectures with conflict resolution and pluggable storage backends.

🤖 Project Origins & Collaboration

Transparency First: This project was created with the assistance of Large Language Models (LLMs) as a starting point for a collaborative open-source initiative.

Why I'm Sharing This: I'm passionate about learning Go and improving my programming skills. Rather than keeping this as a solo project, I'm open-sourcing it to:

  • Learn from the community - Your code reviews, suggestions, and contributions will help me grow as a developer
  • Collaborate together - Let's build something useful while learning from each other
  • Test the concept - See if there's genuine interest in this approach to synchronization in Go
  • Practice open-source - Experience the full cycle of maintaining and growing an open-source project

If you're interested in mentoring, contributing, or learning alongside me, I'd love to hear from you! Whether you're a Go expert or fellow learner, all perspectives are valuable.

💡 Note: While LLMs helped with the initial implementation, all future development will be driven by real-world needs, community feedback, and collaborative human insight.

Features

  • Event-Driven Architecture: Built around append-only event streams
  • Offline-First: Full support for offline operation with automatic sync when reconnected
  • Pluggable Components: Interfaces for storage, transport, versioning, and conflict resolution
  • Conflict Resolution: Multiple strategies including last-write-wins, merge, and custom resolvers
  • Concurrent Safe: Thread-safe operations with proper synchronization
  • Transport Agnostic: Works with HTTP, gRPC, WebSockets, NATS, or any custom transport
  • Storage Agnostic: Compatible with SQLite, BadgerDB, PostgreSQL, or any storage backend
  • Automatic Sync: Configurable periodic synchronization
  • Filtering: Sync only specific events based on custom filters
  • Batching: Efficient batch processing for large event sets
  • Configurable Options: New builder methods allow enabling validation, setting sync timeouts, and enabling compression for transport

Installation

go get github.com/c0deZ3R0/go-sync-kit

Quick Start

package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "time"

    "github.com/c0deZ3R0/go-sync-kit/storage/sqlite"
    sync "github.com/c0deZ3R0/go-sync-kit"
    transport "github.com/c0deZ3R0/go-sync-kit/transport/http"
)

type MyEvent struct {
    id          string
    eventType   string
    aggregateID string
    data        interface{}
    metadata    map[string]interface{}
}

func (e *MyEvent) ID() string { return e.id }
func (e *MyEvent) Type() string { return e.eventType }
func (e *MyEvent) AggregateID() string { return e.aggregateID }
func (e *MyEvent) Data() interface{} { return e.data }
func (e *MyEvent) Metadata() map[string]interface{} { return e.metadata }

func main() {
    // Create SQLite Event Store
    storeConfig := &sqlite.Config{DataSourceName: "file:events.db", EnableWAL: true}
    store, err := sqlite.New(storeConfig)
    if err != nil {
        log.Fatalf("Failed to create SQLite store: %v", err)
    }
    defer store.Close()

    // Set up HTTP server with SyncHandler
    logger := log.New(os.Stdout, "[SyncHandler] ", log.LstdFlags)
    handler := transport.NewSyncHandler(store, logger)
    server := &http.Server{Addr: ":8080", Handler: handler}
	
    go func() {
        if err := server.ListenAndServe(); err != nil {
            log.Fatalf("Failed to start server: %v", err)
        }
    }()

    // Set up HTTP Client with HTTPTransport
    clientTransport := transport.NewTransport("http://localhost:8080", nil)

    // Configure Sync Options
    syncOptions := &sync.SyncOptions{
        BatchSize: 10,
        SyncInterval: 10 * time.Second,
    }

    // Create and start SyncManager
    syncManager := sync.NewSyncManager(store, clientTransport, syncOptions)
    ctx := context.Background()

    // Run synchronization
    result, err := syncManager.Sync(ctx)
    if err != nil {
        log.Fatalf("Sync error: %v", err)
    }
    log.Printf("Sync completed: %+v", result)
}
package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "time"

    "github.com/c0deZ3R0/go-sync-kit"
    "github.com/c0deZ3R0/go-sync-kit/storage/sqlite"
)

// Implement your event type
type MyEvent struct {
    id          string
    eventType   string
    aggregateID string
    data        interface{}
    metadata    map[string]interface{}
}

func (e *MyEvent) ID() string { return e.id }
func (e *MyEvent) Type() string { return e.eventType }
func (e *MyEvent) AggregateID() string { return e.aggregateID }
func (e *MyEvent) Data() interface{} { return e.data }
func (e *MyEvent) Metadata() map[string]interface{} { return e.metadata }

func main() {
    // Create an SQLite event store
    logger := log.New(os.Stdout, "[SQLite EventStore] ", log.LstdFlags)
    config := &sqlite.Config{
        DataSourceName: "file:events.db",
        Logger:         logger,
        EnableWAL:      true,  // Enable WAL for better concurrency
    }

    store, err := sqlite.New(config)
    if err != nil {
        log.Fatalf("Failed to create SQLite store: %v", err)
    }
    defer store.Close()

    // Create a transport
    transport := &MyTransport{} // Your Transport implementation

    // Configure sync options
    options := &sync.SyncOptions{
        BatchSize:        100,
        SyncInterval:     30 * time.Second,
        ConflictResolver: &LastWriteWinsResolver{},
    }

    // Create sync manager
    syncManager := sync.NewSyncManager(store, transport, options)

    // Perform sync
    ctx := context.Background()
    result, err := syncManager.Sync(ctx)
    if err != nil {
        log.Fatalf("Sync failed: %v", err)
    }

    fmt.Printf("Synced: %d pushed, %d pulled\n", 
        result.EventsPushed, result.EventsPulled)
}

Release Notes

This is the initial public release (v0.1.0) of Go Sync Kit. Here’s what you can expect:

  • Functional core features ready for testing
  • API design is solidifying but may evolve
  • Seeking feedback from early adopters
  • Not for production-critical applications yet

Your feedback and contributions are invaluable as we work towards a stable v1.0.0 release. Try it out, report issues, suggest improvements, and help shape the future of Go Sync Kit!

Architecture

Go Sync Kit follows clean architecture principles with clear separation of concerns:

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Application   │    │   SyncManager   │    │    Transport    │
│                 │───▶│                 │───▶│                 │
│  (Your Code)    │    │  (Coordination) │    │   (Network)     │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                               │
                               ▼
                    ┌─────────────────┐    ┌─────────────────┐
                    │   EventStore    │    │ ConflictResolver │
                    │                 │    │                 │
                    │   (Storage)     │    │  (Resolution)   │
                    └─────────────────┘    └─────────────────┘

Core Interfaces

Event

Represents a syncable event in your system:

type Event interface {
    ID() string
    Type() string
    AggregateID() string
    Data() interface{}
    Metadata() map[string]interface{}
}
Version

Handles versioning for sync operations:

type Version interface {
    Compare(other Version) int
    String() string
    IsZero() bool
}
EventStore

Provides persistence for events:

type EventStore interface {
    Store(ctx context.Context, event Event, version Version) error
    Load(ctx context.Context, since Version) ([]EventWithVersion, error)
    LoadByAggregate(ctx context.Context, aggregateID string, since Version) ([]EventWithVersion, error)
    LatestVersion(ctx context.Context) (Version, error)
    Close() error
}
Transport

Handles network communication:

type Transport interface {
    Push(ctx context.Context, events []EventWithVersion) error
    Pull(ctx context.Context, since Version) ([]EventWithVersion, error)
    Subscribe(ctx context.Context, handler func([]EventWithVersion) error) error
    Close() error
}
ConflictResolver

Resolves conflicts when the same data is modified concurrently:

type ConflictResolver interface {
    Resolve(ctx context.Context, local, remote []EventWithVersion) ([]EventWithVersion, error)
}

Conflict Resolution Strategies

Go Sync Kit supports multiple conflict resolution strategies:

Last-Write-Wins
type LastWriteWinsResolver struct{}

func (r *LastWriteWinsResolver) Resolve(ctx context.Context, local, remote []EventWithVersion) ([]EventWithVersion, error) {
    // Keep the events with the latest timestamp
    var resolved []EventWithVersion
    // Implementation logic here...
    return resolved, nil
}
Custom Merge Strategy
type CustomMergeResolver struct{}

func (r *CustomMergeResolver) Resolve(ctx context.Context, local, remote []EventWithVersion) ([]EventWithVersion, error) {
    // Implement your custom merge logic
    // Could merge data fields, prompt user, etc.
    return mergedEvents, nil
}

Configuration Options

type SyncOptions struct {
    // Sync direction control
    PushOnly bool
    PullOnly bool
    
    // Conflict handling
    ConflictResolver ConflictResolver
    
    // Event filtering
    Filter func(Event) bool
    
    // Performance tuning
    BatchSize int
    SyncInterval time.Duration
}
Example with filtering:
options := &sync.SyncOptions{
    BatchSize: 50,
    Filter: func(e sync.Event) bool {
        // Only sync specific event types
        return e.Type() == "UserCreated" || e.Type() == "OrderPlaced"
    },
}

Versioning Strategies

Go Sync Kit supports multiple versioning strategies suitable for different architectures.

For multi-master, peer-to-peer, or offline-first scenarios where writes can happen on multiple nodes concurrently, using a vector clock is the recommended approach. The library provides a VersionedStore decorator that manages versioning logic automatically.

Key Benefits:

  • Causal ordering: Determines if events happened-before, happened-after, or are concurrent
  • Conflict detection: Automatically identifies conflicting concurrent writes
  • Distributed-friendly: No central coordination required
  • Offline-first: Works perfectly for disconnected clients

Usage:

import (
    "github.com/c0deZ3R0/go-sync-kit/storage/sqlite"
    "github.com/c0deZ3R0/go-sync-kit/version"
    sync "github.com/c0deZ3R0/go-sync-kit"
)

// 1. Create a base store (e.g., SQLite)
storeConfig := &sqlite.Config{DataSourceName: "file:events.db", EnableWAL: true}
baseStore, err := sqlite.New(storeConfig)
if err != nil {
    // handle error
}

// 2. Define a unique ID for the current node
nodeID := "client-A"

// 3. Create a vector clock version manager
versionManager := version.NewVectorClockManager()

// 4. Wrap the base store with the VersionedStore decorator
versionedStore, err := version.NewVersionedStore(baseStore, nodeID, versionManager)
if err != nil {
    // handle error
}

// 5. Use the decorated store. It now handles vector clock versioning automatically.
syncManager := sync.NewSyncManager(versionedStore, transport, options)

// When you store an event, the version is managed automatically
err = versionedStore.Store(ctx, myNewEvent, nil) // nil means auto-generate version

Real-world Example:

// Node A creates an event
nodeAStore.Store(ctx, userCreatedEvent, nil)
// Result: {"A": 1}

// Node B creates an event independently  
nodeBStore.Store(ctx, orderPlacedEvent, nil)
// Result: {"B": 1}

// When nodes sync, vector clocks detect concurrent operations
// and enable proper conflict resolution
Simple Versioning (Default)

For single-master or centralized scenarios, you can use simpler versioning strategies like timestamps or sequential IDs. The underlying storage implementations (like SQLite) handle this automatically.

// SQLite store uses timestamp-based versioning by default
store, err := sqlite.New(config)
// No decorator needed - works out of the box
Custom Versioning Strategies

You can implement your own versioning strategy by implementing the VersionManager interface:

type CustomVersionManager struct {
    // Your custom state
}

func (vm *CustomVersionManager) CurrentVersion() sync.Version {
    // Return current version
}

func (vm *CustomVersionManager) NextVersion(nodeID string) sync.Version {
    // Generate next version
}

func (vm *CustomVersionManager) UpdateFromVersion(version sync.Version) error {
    // Update internal state from observed version
}

func (vm *CustomVersionManager) Clone() VersionManager {
    // Create a copy
}

Storage Implementations

SQLite Example
type SQLiteEventStore struct {
    db *sql.DB
}

func (s *SQLiteEventStore) Store(ctx context.Context, event Event, version Version) error {
    query := `INSERT INTO events (id, type, aggregate_id, data, version) VALUES (?, ?, ?, ?, ?)`
    _, err := s.db.ExecContext(ctx, query, event.ID(), event.Type(), 
        event.AggregateID(), event.Data(), version.String())
    return err
}

// Implement other EventStore methods...
BadgerDB Example
type BadgerEventStore struct {
    db *badger.DB
}

func (b *BadgerEventStore) Store(ctx context.Context, event Event, version Version) error {
    return b.db.Update(func(txn *badger.Txn) error {
        key := []byte(fmt.Sprintf("event:%s", event.ID()))
        eventData := EventWithVersion{Event: event, Version: version}
        data, err := json.Marshal(eventData)
        if err != nil {
            return err
        }
        return txn.Set(key, data)
    })
}

Transport Implementations

Built-in HTTP Transport

Go Sync Kit includes a production-ready HTTP transport implementation that provides both client and server components.

Client Setup
import "github.com/c0deZ3R0/go-sync-kit/transport/http"

// Create HTTP transport client
clientTransport := http.NewTransport("http://localhost:8080", nil)

// Use with SyncManager
syncManager := sync.NewSyncManager(store, clientTransport, options)
Server Setup
import "github.com/c0deZ3R0/go-sync-kit/transport/http"

// Create HTTP sync handler
logger := log.New(os.Stdout, "[SyncHandler] ", log.LstdFlags)
handler := http.NewSyncHandler(store, logger)

// Start HTTP server
server := &http.Server{Addr: ":8080", Handler: handler}
go server.ListenAndServe()
API Endpoints

The HTTP transport provides two RESTful endpoints:

  • POST /push - Accepts events to be stored on the server
  • GET /pull?since= - Returns events since the specified version
Features
  • JSON serialization with proper interface handling
  • Context cancellation support
  • Comprehensive error handling with HTTP status codes
  • Storage-agnostic server implementation
  • Configurable HTTP client for custom timeouts, TLS, etc.
  • Batch processing for efficient sync operations
Complete Client/Server Example
package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "time"

    "github.com/c0deZ3R0/go-sync-kit/storage/sqlite"
    sync "github.com/c0deZ3R0/go-sync-kit"
    transport "github.com/c0deZ3R0/go-sync-kit/transport/http"
)

func main() {
    // 1. Create SQLite store
    store, err := sqlite.NewWithDataSource("file:events.db")
    if err != nil {
        log.Fatal(err)
    }
    defer store.Close()

    // 2. Start HTTP server
    logger := log.New(os.Stdout, "[SyncHandler] ", log.LstdFlags)
    handler := transport.NewSyncHandler(store, logger)
    server := &http.Server{Addr: ":8080", Handler: handler}
    
    go func() {
        log.Println("Starting sync server on :8080")
        if err := server.ListenAndServe(); err != nil {
            log.Printf("Server error: %v", err)
        }
    }()

    // Give server time to start
    time.Sleep(100 * time.Millisecond)

    // 3. Create HTTP client transport
    clientTransport := transport.NewTransport("http://localhost:8080", nil)

    // 4. Configure sync options
    syncOptions := &sync.SyncOptions{
        BatchSize: 10,
        SyncInterval: 10 * time.Second,
    }

    // 5. Create sync manager
    syncManager := sync.NewSyncManager(store, clientTransport, syncOptions)

    // 6. Perform synchronization
    ctx := context.Background()
    result, err := syncManager.Sync(ctx)
    if err != nil {
        log.Fatalf("Sync failed: %v", err)
    }

    log.Printf("Sync completed: %d pushed, %d pulled", 
        result.EventsPushed, result.EventsPulled)
}
Custom HTTP Transport Example
type CustomHTTPTransport struct {
    client  *http.Client
    baseURL string
}

func (h *CustomHTTPTransport) Push(ctx context.Context, events []EventWithVersion) error {
    data, err := json.Marshal(events)
    if err != nil {
        return err
    }
    
    req, err := http.NewRequestWithContext(ctx, "POST", 
        h.baseURL+"/custom/push", bytes.NewBuffer(data))
    if err != nil {
        return err
    }
    
    resp, err := h.client.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()
    
    return nil
}
NATS Transport Example (with Watermill)
type NATSTransport struct {
    publisher  message.Publisher
    subscriber message.Subscriber
}

func (n *NATSTransport) Push(ctx context.Context, events []EventWithVersion) error {
    for _, event := range events {
        data, err := json.Marshal(event)
        if err != nil {
            return err
        }
        
        msg := message.NewMessage(watermill.NewUUID(), data)
        err = n.publisher.Publish("sync.events", msg)
        if err != nil {
            return err
        }
    }
    return nil
}

Advanced Usage

Automatic Sync
// Start automatic sync every 30 seconds
ctx := context.Background()
err := syncManager.StartAutoSync(ctx)
if err != nil {
    log.Fatal(err)
}

// Stop automatic sync
err = syncManager.StopAutoSync()
Event Subscriptions
err := syncManager.Subscribe(func(result *sync.SyncResult) {
    log.Printf("Sync completed: %d events pushed, %d pulled, %d conflicts resolved",
        result.EventsPushed, result.EventsPulled, result.ConflictsResolved)
    
    if len(result.Errors) > 0 {
        log.Printf("Sync errors: %v", result.Errors)
    }
})
Manual Push/Pull
// Push only local changes
result, err := syncManager.Push(ctx)

// Pull only remote changes  
result, err := syncManager.Pull(ctx)

Testing

Go Sync Kit is designed for testability with mock implementations included:

func TestMySync(t *testing.T) {
    store := &MockEventStore{}
    transport := &MockTransport{}
    resolver := &MockConflictResolver{}
    
    sm := sync.NewSyncManager(store, transport, &sync.SyncOptions{
        ConflictResolver: resolver,
    })
    
    // Test your sync logic
    result, err := sm.Sync(context.Background())
    assert.NoError(t, err)
    assert.Equal(t, 1, result.EventsPushed)
}

Run tests:

go test ./...

Performance Considerations

  • Batching: Use appropriate batch sizes for your network conditions
  • Filtering: Apply filters to reduce sync overhead
  • Storage: Choose storage backends appropriate for your scale
  • Conflict Resolution: Simple strategies (like last-write-wins) are faster than complex merging

Roadmap

  • SQLite EventStore - Production-ready SQLite implementation with WAL support
  • Vector Clock Versioning - Complete implementation with VersionedStore decorator
  • Built-in storage implementations (BadgerDB, PostgreSQL)
  • Built-in transport implementations (HTTP)
  • Built-in transport implementations (gRPC, WebSocket)
  • Compression support for large event payloads
  • Metrics and observability hooks
  • Schema evolution support

Contributing

We're actively seeking feedback and contributions! As an early-stage project (v0.1.0), your input is especially valuable.

Ways to Contribute:
  • Try it out and report your experience
  • Open issues for bugs, feature requests, or API suggestions
  • Share feedback on the API design and usability
  • Contribute code improvements and new features
  • Write examples showing real-world usage
  • Mentor & teach - Help me learn Go best practices and patterns
  • Learn together - If you're also learning Go, let's collaborate and grow together
  • Code reviews - Point out improvements, suggest better approaches, or explain Go idioms
Code Contributions:
  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request
Feedback & Discussion:
  • Open an issue to discuss API changes or improvements
  • Share your use case and how Go Sync Kit fits (or doesn't fit)
  • Suggest better naming, patterns, or architectural improvements

License

This project is licensed under the MIT License - see the LICENSE file for details.

Inspiration

This project was inspired by:

  • Go Kit for clean interface design
  • PouchDB for sync protocol concepts
  • CouchDB for replication patterns
  • Watermill for event streaming architecture

Documentation

Overview

Package sync provides a generic event-driven synchronization system for distributed applications. It supports offline-first architectures with conflict resolution and pluggable storage backends.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BackoffStrategy

type BackoffStrategy interface {
	// NextDelay returns the delay before the next reconnection attempt
	NextDelay(attempt int) time.Duration

	// Reset resets the backoff strategy after a successful connection
	Reset()
}

BackoffStrategy defines how to handle reconnection delays

type ConflictResolver

type ConflictResolver interface {
	// Resolve handles a conflict between local and remote events
	// Returns the resolved events to keep
	Resolve(ctx context.Context, local, remote []EventWithVersion) ([]EventWithVersion, error)
}

ConflictResolver handles conflicts when the same data is modified concurrently. Different strategies can be plugged in (last-write-wins, merge, user-prompt, etc.).

type ConnectionStatus

type ConnectionStatus struct {
	Connected         bool
	LastConnected     time.Time
	ReconnectAttempts int
	Error             error
}

ConnectionStatus represents the state of the real-time connection

type Event

type Event interface {
	// ID returns a unique identifier for this event
	ID() string

	// Type returns the event type (e.g., "UserCreated", "OrderUpdated")
	Type() string

	// AggregateID returns the ID of the aggregate this event belongs to
	AggregateID() string

	// Data returns the event payload
	Data() interface{}

	// Metadata returns additional event metadata
	Metadata() map[string]interface{}
}

Event represents a syncable event in the system. This interface should be implemented by user's event types.

type EventStore

type EventStore interface {
	// Store persists an event to the store
	Store(ctx context.Context, event Event, version Version) error

	// Load retrieves all events since the given version
	Load(ctx context.Context, since Version) ([]EventWithVersion, error)

	// LoadByAggregate retrieves events for a specific aggregate since the given version
	LoadByAggregate(ctx context.Context, aggregateID string, since Version) ([]EventWithVersion, error)

	// LatestVersion returns the latest version in the store
	LatestVersion(ctx context.Context) (Version, error)

	// ParseVersion converts a string representation into a Version
	// This allows different storage implementations to handle their own version formats
	ParseVersion(ctx context.Context, versionStr string) (Version, error)

	// Close closes the store and releases resources
	Close() error
}

EventStore provides persistence for events. Implementations can use any storage backend (SQLite, BadgerDB, PostgreSQL, etc.).

type EventWithVersion

type EventWithVersion struct {
	Event   Event
	Version Version
}

EventWithVersion pairs an event with its version information

type ExponentialBackoff

type ExponentialBackoff struct {
	InitialDelay time.Duration
	MaxDelay     time.Duration
	Multiplier   float64
}

ExponentialBackoff implements exponential backoff with jitter

func (*ExponentialBackoff) NextDelay

func (eb *ExponentialBackoff) NextDelay(attempt int) time.Duration

func (*ExponentialBackoff) Reset

func (eb *ExponentialBackoff) Reset()

type MetricsCollector added in v0.5.0

type MetricsCollector interface {
	// RecordSyncDuration records how long a sync operation took
	RecordSyncDuration(operation string, duration time.Duration)

	// RecordSyncEvents records the number of events pushed and pulled
	RecordSyncEvents(pushed, pulled int)

	// RecordSyncErrors records sync operation errors by type
	RecordSyncErrors(operation string, errorType string)

	// RecordConflicts records the number of conflicts resolved
	RecordConflicts(resolved int)
}

MetricsCollector provides hooks for collecting sync operation metrics

type MockMetricsCollector added in v0.5.0

type MockMetricsCollector struct {
	DurationCalls []struct {
		Operation string
		Duration  time.Duration
	}
	EventCalls []struct {
		Pushed, Pulled int
	}
	ErrorCalls []struct {
		Operation, ErrorType string
	}
	ConflictCalls []struct {
		Resolved int
	}
}

MockMetricsCollector implements MetricsCollector interface for testing

func (*MockMetricsCollector) RecordConflicts added in v0.5.0

func (m *MockMetricsCollector) RecordConflicts(resolved int)

func (*MockMetricsCollector) RecordSyncDuration added in v0.5.0

func (m *MockMetricsCollector) RecordSyncDuration(operation string, duration time.Duration)

func (*MockMetricsCollector) RecordSyncErrors added in v0.5.0

func (m *MockMetricsCollector) RecordSyncErrors(operation string, errorType string)

func (*MockMetricsCollector) RecordSyncEvents added in v0.5.0

func (m *MockMetricsCollector) RecordSyncEvents(pushed, pulled int)

type NoOpMetricsCollector added in v0.5.0

type NoOpMetricsCollector struct{}

NoOpMetricsCollector is a default implementation that does nothing

func (*NoOpMetricsCollector) RecordConflicts added in v0.5.0

func (n *NoOpMetricsCollector) RecordConflicts(resolved int)

func (*NoOpMetricsCollector) RecordSyncDuration added in v0.5.0

func (n *NoOpMetricsCollector) RecordSyncDuration(operation string, duration time.Duration)

func (*NoOpMetricsCollector) RecordSyncErrors added in v0.5.0

func (n *NoOpMetricsCollector) RecordSyncErrors(operation string, errorType string)

func (*NoOpMetricsCollector) RecordSyncEvents added in v0.5.0

func (n *NoOpMetricsCollector) RecordSyncEvents(pushed, pulled int)

type Notification

type Notification struct {
	// Type of notification (data_changed, sync_requested, etc.)
	Type string

	// Source identifies where the change originated
	Source string

	// AggregateIDs that were affected (optional, for filtering)
	AggregateIDs []string

	// Metadata for additional context
	Metadata map[string]interface{}

	// Timestamp when the notification was created
	Timestamp time.Time
}

Notification represents a real-time update notification

type NotificationFilter

type NotificationFilter func(notification Notification) bool

NotificationFilter allows filtering which notifications to process

type NotificationHandler

type NotificationHandler func(notification Notification) error

NotificationHandler processes incoming real-time notifications

type RealtimeNotifier

type RealtimeNotifier interface {
	// Subscribe starts listening for real-time notifications
	// The handler will be called when new data is available for sync
	Subscribe(ctx context.Context, handler NotificationHandler) error

	// Unsubscribe stops listening for notifications
	Unsubscribe() error

	// Notify sends a notification to connected clients (server-side)
	Notify(ctx context.Context, notification Notification) error

	// IsConnected returns true if the real-time connection is active
	IsConnected() bool

	// Close closes the notifier connection
	Close() error
}

RealtimeNotifier provides real-time push notifications when data changes. This is separate from Transport to allow for different notification mechanisms (WebSockets, SSE, NATS, etc.) while keeping the sync logic agnostic.

type RealtimeSyncManager

type RealtimeSyncManager interface {
	SyncManager

	// EnableRealtime starts real-time notifications
	EnableRealtime(ctx context.Context) error

	// DisableRealtime stops real-time notifications and falls back to polling
	DisableRealtime() error

	// IsRealtimeActive returns true if real-time notifications are active
	IsRealtimeActive() bool

	// GetConnectionStatus returns the current real-time connection status
	GetConnectionStatus() ConnectionStatus
}

RealtimeSyncManager extends SyncManager with real-time capabilities

func NewRealtimeSyncManager

func NewRealtimeSyncManager(store EventStore, transport Transport, options *RealtimeSyncOptions) RealtimeSyncManager

NewRealtimeSyncManager creates a sync manager with real-time capabilities

type RealtimeSyncOptions

type RealtimeSyncOptions struct {
	SyncOptions

	// RealtimeNotifier for push notifications (optional)
	RealtimeNotifier RealtimeNotifier

	// NotificationFilter to process only relevant notifications
	NotificationFilter NotificationFilter

	// DisablePolling stops periodic sync when real-time is active
	DisablePolling bool

	// FallbackInterval for polling when real-time connection is lost
	FallbackInterval time.Duration

	// ReconnectBackoff strategy for reconnecting to real-time notifications
	ReconnectBackoff BackoffStrategy
}

RealtimeSyncOptions extends SyncOptions with real-time capabilities

type RetryConfig added in v0.5.0

type RetryConfig struct {
	// MaxAttempts is the maximum number of retry attempts
	MaxAttempts int

	// InitialDelay is the initial delay between retries
	InitialDelay time.Duration

	// MaxDelay is the maximum delay between retries
	MaxDelay time.Duration

	// Multiplier is the factor by which the delay increases
	Multiplier float64
}

RetryConfig configures the retry behavior for sync operations

type SyncManager

type SyncManager interface {
	// Sync performs a bidirectional sync operation
	Sync(ctx context.Context) (*SyncResult, error)

	// Push sends local events to remote
	Push(ctx context.Context) (*SyncResult, error)

	// Pull retrieves remote events to local
	Pull(ctx context.Context) (*SyncResult, error)

	// StartAutoSync begins automatic synchronization at the configured interval
	StartAutoSync(ctx context.Context) error

	// StopAutoSync stops automatic synchronization
	StopAutoSync() error

	// Subscribe to sync events (optional)
	Subscribe(handler func(*SyncResult)) error

	// Close shuts down the sync manager
	Close() error
}

SyncManager coordinates the synchronization process between local and remote stores. This is the main entry point for the sync package.

func NewSyncManager

func NewSyncManager(store EventStore, transport Transport, opts *SyncOptions) SyncManager

NewSyncManager creates a new sync manager with the provided components

type SyncManagerBuilder added in v0.5.0

type SyncManagerBuilder struct {
	// contains filtered or unexported fields
}

SyncManagerBuilder provides a fluent interface for constructing SyncManager instances.

func NewSyncManagerBuilder added in v0.5.0

func NewSyncManagerBuilder() *SyncManagerBuilder

NewSyncManagerBuilder creates a new builder with default options.

func (*SyncManagerBuilder) Build added in v0.5.0

func (b *SyncManagerBuilder) Build() (SyncManager, error)

Build creates a new SyncManager instance with the configured options.

func (*SyncManagerBuilder) Reset added in v0.5.0

Reset clears the builder, allowing reuse.

func (*SyncManagerBuilder) WithBatchSize added in v0.5.0

func (b *SyncManagerBuilder) WithBatchSize(size int) *SyncManagerBuilder

WithBatchSize sets the batch size for sync operations.

func (*SyncManagerBuilder) WithCompression added in v0.5.0

func (b *SyncManagerBuilder) WithCompression(enabled bool) *SyncManagerBuilder

WithCompression enables data compression during transport.

func (*SyncManagerBuilder) WithConflictResolver added in v0.5.0

func (b *SyncManagerBuilder) WithConflictResolver(resolver ConflictResolver) *SyncManagerBuilder

WithConflictResolver sets the conflict resolution strategy.

func (*SyncManagerBuilder) WithFilter added in v0.5.0

func (b *SyncManagerBuilder) WithFilter(filter func(Event) bool) *SyncManagerBuilder

WithFilter sets an event filter function.

func (*SyncManagerBuilder) WithPullOnly added in v0.5.0

func (b *SyncManagerBuilder) WithPullOnly() *SyncManagerBuilder

WithPullOnly configures the SyncManager to only pull events.

func (*SyncManagerBuilder) WithPushOnly added in v0.5.0

func (b *SyncManagerBuilder) WithPushOnly() *SyncManagerBuilder

WithPushOnly configures the SyncManager to only push events.

func (*SyncManagerBuilder) WithStore added in v0.5.0

func (b *SyncManagerBuilder) WithStore(store EventStore) *SyncManagerBuilder

WithStore sets the EventStore for the SyncManager.

func (*SyncManagerBuilder) WithSyncInterval added in v0.5.0

func (b *SyncManagerBuilder) WithSyncInterval(interval time.Duration) *SyncManagerBuilder

WithSyncInterval sets the interval for automatic synchronization.

func (*SyncManagerBuilder) WithTimeout added in v0.5.0

func (b *SyncManagerBuilder) WithTimeout(timeout time.Duration) *SyncManagerBuilder

WithTimeout sets the maximum duration for sync operations.

func (*SyncManagerBuilder) WithTransport added in v0.5.0

func (b *SyncManagerBuilder) WithTransport(transport Transport) *SyncManagerBuilder

WithTransport sets the Transport for the SyncManager.

func (*SyncManagerBuilder) WithValidation added in v0.5.0

func (b *SyncManagerBuilder) WithValidation() *SyncManagerBuilder

WithValidation enables additional validation checks during sync operations.

type SyncOptions

type SyncOptions struct {
	// PushOnly indicates this client should only push events, not pull
	PushOnly bool

	// PullOnly indicates this client should only pull events, not push
	PullOnly bool

	// ConflictResolver to use for handling conflicts
	ConflictResolver ConflictResolver

	// Filter can be used to sync only specific events
	Filter func(Event) bool

	// BatchSize limits how many events to sync at once
	BatchSize int

	// SyncInterval for automatic periodic sync (0 disables)
	SyncInterval time.Duration

	// RetryConfig configures retry behavior for sync operations
	RetryConfig *RetryConfig

	// EnableValidation enables additional validation checks during sync
	EnableValidation bool

	// Timeout sets the maximum duration for sync operations
	Timeout time.Duration

	// EnableCompression enables data compression during transport
	EnableCompression bool

	// MetricsCollector for observability hooks (optional)
	MetricsCollector MetricsCollector
}

SyncOptions configures the synchronization behavior

type SyncResult

type SyncResult struct {
	// EventsPushed is the number of events sent to remote
	EventsPushed int

	// EventsPulled is the number of events received from remote
	EventsPulled int

	// ConflictsResolved is the number of conflicts that were resolved
	ConflictsResolved int

	// Errors contains any non-fatal errors that occurred during sync
	Errors []error

	// StartTime is when the sync operation began
	StartTime time.Time

	// Duration is how long the sync took
	Duration time.Duration

	// LocalVersion is the local version after sync
	LocalVersion Version

	// RemoteVersion is the remote version after sync
	RemoteVersion Version
}

SyncResult provides information about a completed sync operation

type TestEvent added in v0.5.0

type TestEvent struct{}

TestEvent implements Event interface for testing

func (*TestEvent) AggregateID added in v0.5.0

func (m *TestEvent) AggregateID() string

func (*TestEvent) Data added in v0.5.0

func (m *TestEvent) Data() interface{}

func (*TestEvent) ID added in v0.5.0

func (m *TestEvent) ID() string

func (*TestEvent) Metadata added in v0.5.0

func (m *TestEvent) Metadata() map[string]interface{}

func (*TestEvent) Type added in v0.5.0

func (m *TestEvent) Type() string

type TestEventStore added in v0.5.0

type TestEventStore struct {
	// contains filtered or unexported fields
}

TestEventStore implements a simple in-memory event store for testing

func (*TestEventStore) Close added in v0.5.0

func (m *TestEventStore) Close() error

func (*TestEventStore) LatestVersion added in v0.5.0

func (m *TestEventStore) LatestVersion(_ context.Context) (Version, error)

func (*TestEventStore) Load added in v0.5.0

func (*TestEventStore) LoadByAggregate added in v0.5.0

func (m *TestEventStore) LoadByAggregate(_ context.Context, _ string, _ Version) ([]EventWithVersion, error)

func (*TestEventStore) ParseVersion added in v0.5.0

func (m *TestEventStore) ParseVersion(_ context.Context, _ string) (Version, error)

func (*TestEventStore) Store added in v0.5.0

func (m *TestEventStore) Store(_ context.Context, event Event, version Version) error

type TestTransport added in v0.5.0

type TestTransport struct {
	// contains filtered or unexported fields
}

TestTransport implements a simple transport for testing

func (*TestTransport) Close added in v0.5.0

func (m *TestTransport) Close() error

func (*TestTransport) GetLatestVersion added in v0.5.0

func (m *TestTransport) GetLatestVersion(ctx context.Context) (Version, error)

func (*TestTransport) Pull added in v0.5.0

func (m *TestTransport) Pull(ctx context.Context, since Version) ([]EventWithVersion, error)

func (*TestTransport) Push added in v0.5.0

func (m *TestTransport) Push(ctx context.Context, events []EventWithVersion) error

func (*TestTransport) Subscribe added in v0.5.0

func (m *TestTransport) Subscribe(_ context.Context, _ func([]EventWithVersion) error) error

type TestVersion added in v0.5.0

type TestVersion struct{}

TestVersion implements Version interface for testing

func (*TestVersion) Compare added in v0.5.0

func (m *TestVersion) Compare(_ Version) int

func (*TestVersion) IsZero added in v0.5.0

func (m *TestVersion) IsZero() bool

func (*TestVersion) String added in v0.5.0

func (m *TestVersion) String() string

type Transport

type Transport interface {
	// Push sends events to the remote endpoint
	Push(ctx context.Context, events []EventWithVersion) error

	// Pull retrieves events from the remote endpoint since the given version
	Pull(ctx context.Context, since Version) ([]EventWithVersion, error)

	// GetLatestVersion efficiently retrieves the latest version from remote without pulling events
	GetLatestVersion(ctx context.Context) (Version, error)

	// Subscribe listens for real-time updates (optional for polling-based transports)
	Subscribe(ctx context.Context, handler func([]EventWithVersion) error) error

	// Close closes the transport connection
	Close() error
}

Transport handles the actual network communication between clients and servers. Implementations can use HTTP, gRPC, WebSockets, NATS, etc.

type Version

type Version interface {
	// Compare returns -1 if this version is before other, 0 if equal, 1 if after
	Compare(other Version) int

	// String returns a string representation of the version
	String() string

	// IsZero returns true if this is the zero/initial version
	IsZero() bool
}

Version represents a point-in-time snapshot for sync operations. Users can implement different versioning strategies (timestamps, hashes, vector clocks).

Directories

Path Synopsis
Package errors provides custom error types for the sync package
Package errors provides custom error types for the sync package
storage
sqlite
Package sqlite provides a SQLite implementation of the go-sync-kit EventStore.
Package sqlite provides a SQLite implementation of the go-sync-kit EventStore.
transport
http
Package http provides a client and server implementation for the go-sync-kit Transport over HTTP.
Package http provides a client and server implementation for the go-sync-kit Transport over HTTP.
Package version provides various version implementations for the go-sync-kit library.
Package version provides various version implementations for the go-sync-kit library.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL