synckit

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 10, 2025 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package sync provides a generic event-driven synchronization system for distributed applications. It supports offline-first architectures with conflict resolution and pluggable storage backends.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BackoffStrategy

type BackoffStrategy interface {
	// NextDelay returns the delay before the next reconnection attempt
	NextDelay(attempt int) time.Duration

	// Reset resets the backoff strategy after a successful connection
	Reset()
}

BackoffStrategy defines how to handle reconnection delays

type ConflictResolver

type ConflictResolver interface {
	// Resolve handles a conflict between local and remote events
	// Returns the resolved events to keep
	Resolve(ctx context.Context, local, remote []EventWithVersion) ([]EventWithVersion, error)
}

ConflictResolver handles conflicts when the same data is modified concurrently. Different strategies can be plugged in (last-write-wins, merge, user-prompt, etc.).

type ConnectionStatus

type ConnectionStatus struct {
	Connected         bool
	LastConnected     time.Time
	ReconnectAttempts int
	Error             error
}

ConnectionStatus represents the state of the real-time connection

type CursorTransport added in v0.8.0

type CursorTransport interface {
	Transport

	// PullWithCursor retrieves events using a cursor-based pagination strategy.
	// Returns events, next cursor, and any error.
	PullWithCursor(ctx context.Context, since cursor.Cursor, limit int) ([]EventWithVersion, cursor.Cursor, error)
}

CursorTransport extends Transport with cursor-based capabilities.

type Event

type Event interface {
	// ID returns a unique identifier for this event
	ID() string

	// Type returns the event type (e.g., "UserCreated", "OrderUpdated")
	Type() string

	// AggregateID returns the ID of the aggregate this event belongs to
	AggregateID() string

	// Data returns the event payload
	Data() interface{}

	// Metadata returns additional event metadata
	Metadata() map[string]interface{}
}

Event represents a syncable event in the system. This interface should be implemented by user's event types.

type EventStore

type EventStore interface {
	// Store persists an event to the store
	Store(ctx context.Context, event Event, version Version) error

	// Load retrieves all events since the given version
	Load(ctx context.Context, since Version) ([]EventWithVersion, error)

	// LoadByAggregate retrieves events for a specific aggregate since the given version
	LoadByAggregate(ctx context.Context, aggregateID string, since Version) ([]EventWithVersion, error)

	// LatestVersion returns the latest version in the store
	LatestVersion(ctx context.Context) (Version, error)

	// ParseVersion converts a string representation into a Version
	// This allows different storage implementations to handle their own version formats
	ParseVersion(ctx context.Context, versionStr string) (Version, error)

	// Close closes the store and releases resources
	Close() error
}

EventStore provides persistence for events. Implementations can use any storage backend (SQLite, BadgerDB, PostgreSQL, etc.).

type EventWithVersion

type EventWithVersion struct {
	Event   Event
	Version Version
}

EventWithVersion pairs an event with its version information

type ExponentialBackoff

type ExponentialBackoff struct {
	InitialDelay time.Duration
	MaxDelay     time.Duration
	Multiplier   float64
}

ExponentialBackoff implements exponential backoff with jitter

func (*ExponentialBackoff) NextDelay

func (eb *ExponentialBackoff) NextDelay(attempt int) time.Duration

func (*ExponentialBackoff) Reset

func (eb *ExponentialBackoff) Reset()

type MetricsCollector

type MetricsCollector interface {
	// RecordSyncDuration records how long a sync operation took
	RecordSyncDuration(operation string, duration time.Duration)

	// RecordSyncEvents records the number of events pushed and pulled
	RecordSyncEvents(pushed, pulled int)

	// RecordSyncErrors records sync operation errors by type
	RecordSyncErrors(operation string, errorType string)

	// RecordConflicts records the number of conflicts resolved
	RecordConflicts(resolved int)
}

MetricsCollector provides hooks for collecting sync operation metrics

type MockMetricsCollector

type MockMetricsCollector struct {
	DurationCalls []struct {
		Operation string
		Duration  time.Duration
	}
	EventCalls []struct {
		Pushed, Pulled int
	}
	ErrorCalls []struct {
		Operation, ErrorType string
	}
	ConflictCalls []struct {
		Resolved int
	}
}

MockMetricsCollector implements MetricsCollector interface for testing

func (*MockMetricsCollector) RecordConflicts

func (m *MockMetricsCollector) RecordConflicts(resolved int)

func (*MockMetricsCollector) RecordSyncDuration

func (m *MockMetricsCollector) RecordSyncDuration(operation string, duration time.Duration)

func (*MockMetricsCollector) RecordSyncErrors

func (m *MockMetricsCollector) RecordSyncErrors(operation string, errorType string)

func (*MockMetricsCollector) RecordSyncEvents

func (m *MockMetricsCollector) RecordSyncEvents(pushed, pulled int)

type NoOpMetricsCollector

type NoOpMetricsCollector struct{}

NoOpMetricsCollector is a default implementation that does nothing

func (*NoOpMetricsCollector) RecordConflicts

func (n *NoOpMetricsCollector) RecordConflicts(resolved int)

func (*NoOpMetricsCollector) RecordSyncDuration

func (n *NoOpMetricsCollector) RecordSyncDuration(operation string, duration time.Duration)

func (*NoOpMetricsCollector) RecordSyncErrors

func (n *NoOpMetricsCollector) RecordSyncErrors(operation string, errorType string)

func (*NoOpMetricsCollector) RecordSyncEvents

func (n *NoOpMetricsCollector) RecordSyncEvents(pushed, pulled int)

type Notification

type Notification struct {
	// Type of notification (data_changed, sync_requested, etc.)
	Type string

	// Source identifies where the change originated
	Source string

	// AggregateIDs that were affected (optional, for filtering)
	AggregateIDs []string

	// Metadata for additional context
	Metadata map[string]interface{}

	// Timestamp when the notification was created
	Timestamp time.Time
}

Notification represents a real-time update notification

type NotificationFilter

type NotificationFilter func(notification Notification) bool

NotificationFilter allows filtering which notifications to process

type NotificationHandler

type NotificationHandler func(notification Notification) error

NotificationHandler processes incoming real-time notifications

type RealtimeNotifier

type RealtimeNotifier interface {
	// Subscribe starts listening for real-time notifications
	// The handler will be called when new data is available for sync
	Subscribe(ctx context.Context, handler NotificationHandler) error

	// Unsubscribe stops listening for notifications
	Unsubscribe() error

	// Notify sends a notification to connected clients (server-side)
	Notify(ctx context.Context, notification Notification) error

	// IsConnected returns true if the real-time connection is active
	IsConnected() bool

	// Close closes the notifier connection
	Close() error
}

RealtimeNotifier provides real-time push notifications when data changes. This is separate from Transport to allow for different notification mechanisms (WebSockets, SSE, NATS, etc.) while keeping the sync logic agnostic.

type RealtimeSyncManager

type RealtimeSyncManager interface {
	SyncManager

	// EnableRealtime starts real-time notifications
	EnableRealtime(ctx context.Context) error

	// DisableRealtime stops real-time notifications and falls back to polling
	DisableRealtime() error

	// IsRealtimeActive returns true if real-time notifications are active
	IsRealtimeActive() bool

	// GetConnectionStatus returns the current real-time connection status
	GetConnectionStatus() ConnectionStatus
}

RealtimeSyncManager extends SyncManager with real-time capabilities

func NewRealtimeSyncManager

func NewRealtimeSyncManager(store EventStore, transport Transport, options *RealtimeSyncOptions) RealtimeSyncManager

NewRealtimeSyncManager creates a sync manager with real-time capabilities

type RealtimeSyncOptions

type RealtimeSyncOptions struct {
	SyncOptions

	// RealtimeNotifier for push notifications (optional)
	RealtimeNotifier RealtimeNotifier

	// NotificationFilter to process only relevant notifications
	NotificationFilter NotificationFilter

	// DisablePolling stops periodic sync when real-time is active
	DisablePolling bool

	// FallbackInterval for polling when real-time connection is lost
	FallbackInterval time.Duration

	// ReconnectBackoff strategy for reconnecting to real-time notifications
	ReconnectBackoff BackoffStrategy
}

RealtimeSyncOptions extends SyncOptions with real-time capabilities

type RetryConfig

type RetryConfig struct {
	// MaxAttempts is the maximum number of retry attempts
	MaxAttempts int

	// InitialDelay is the initial delay between retries
	InitialDelay time.Duration

	// MaxDelay is the maximum delay between retries
	MaxDelay time.Duration

	// Multiplier is the factor by which the delay increases
	Multiplier float64
}

RetryConfig configures the retry behavior for sync operations

type SyncManager

type SyncManager interface {
	// Sync performs a bidirectional sync operation
	Sync(ctx context.Context) (*SyncResult, error)

	// Push sends local events to remote
	Push(ctx context.Context) (*SyncResult, error)

	// Pull retrieves remote events to local
	Pull(ctx context.Context) (*SyncResult, error)

	// StartAutoSync begins automatic synchronization at the configured interval
	StartAutoSync(ctx context.Context) error

	// StopAutoSync stops automatic synchronization
	StopAutoSync() error

	// Subscribe to sync events (optional)
	Subscribe(handler func(*SyncResult)) error

	// Close shuts down the sync manager
	Close() error
}

SyncManager coordinates the synchronization process between local and remote stores. This is the main entry point for the sync package.

func NewSyncManager

func NewSyncManager(store EventStore, transport Transport, opts *SyncOptions) SyncManager

NewSyncManager creates a new sync manager with the provided components

type SyncManagerBuilder

type SyncManagerBuilder struct {
	// contains filtered or unexported fields
}

SyncManagerBuilder provides a fluent interface for constructing SyncManager instances.

func NewSyncManagerBuilder

func NewSyncManagerBuilder() *SyncManagerBuilder

NewSyncManagerBuilder creates a new builder with default options.

func (*SyncManagerBuilder) Build

func (b *SyncManagerBuilder) Build() (SyncManager, error)

Build creates a new SyncManager instance with the configured options.

func (*SyncManagerBuilder) Reset

Reset clears the builder, allowing reuse.

func (*SyncManagerBuilder) WithBatchSize

func (b *SyncManagerBuilder) WithBatchSize(size int) *SyncManagerBuilder

WithBatchSize sets the batch size for sync operations.

func (*SyncManagerBuilder) WithCompression

func (b *SyncManagerBuilder) WithCompression(enabled bool) *SyncManagerBuilder

WithCompression enables data compression during transport.

func (*SyncManagerBuilder) WithConflictResolver

func (b *SyncManagerBuilder) WithConflictResolver(resolver ConflictResolver) *SyncManagerBuilder

WithConflictResolver sets the conflict resolution strategy.

func (*SyncManagerBuilder) WithFilter

func (b *SyncManagerBuilder) WithFilter(filter func(Event) bool) *SyncManagerBuilder

WithFilter sets an event filter function.

func (*SyncManagerBuilder) WithPullOnly

func (b *SyncManagerBuilder) WithPullOnly() *SyncManagerBuilder

WithPullOnly configures the SyncManager to only pull events.

func (*SyncManagerBuilder) WithPushOnly

func (b *SyncManagerBuilder) WithPushOnly() *SyncManagerBuilder

WithPushOnly configures the SyncManager to only push events.

func (*SyncManagerBuilder) WithStore

func (b *SyncManagerBuilder) WithStore(store EventStore) *SyncManagerBuilder

WithStore sets the EventStore for the SyncManager.

func (*SyncManagerBuilder) WithSyncInterval

func (b *SyncManagerBuilder) WithSyncInterval(interval time.Duration) *SyncManagerBuilder

WithSyncInterval sets the interval for automatic synchronization.

func (*SyncManagerBuilder) WithTimeout

func (b *SyncManagerBuilder) WithTimeout(timeout time.Duration) *SyncManagerBuilder

WithTimeout sets the maximum duration for sync operations.

func (*SyncManagerBuilder) WithTransport

func (b *SyncManagerBuilder) WithTransport(transport Transport) *SyncManagerBuilder

WithTransport sets the Transport for the SyncManager.

func (*SyncManagerBuilder) WithValidation

func (b *SyncManagerBuilder) WithValidation() *SyncManagerBuilder

WithValidation enables additional validation checks during sync operations.

type SyncOptions

type SyncOptions struct {
	// LastCursorLoader loads the last saved cursor for cursor-based syncs
	LastCursorLoader func() cursor.Cursor

	// CursorSaver saves the latest cursor after a successful sync
	CursorSaver func(cursor.Cursor) error

	// PushOnly indicates this client should only push events, not pull
	PushOnly bool

	// PullOnly indicates this client should only pull events, not push
	PullOnly bool

	// ConflictResolver to use for handling conflicts
	ConflictResolver ConflictResolver

	// Filter can be used to sync only specific events
	Filter func(Event) bool

	// BatchSize limits how many events to sync at once
	BatchSize int

	// SyncInterval for automatic periodic sync (0 disables)
	SyncInterval time.Duration

	// RetryConfig configures retry behavior for sync operations
	RetryConfig *RetryConfig

	// EnableValidation enables additional validation checks during sync
	EnableValidation bool

	// Timeout sets the maximum duration for sync operations
	Timeout time.Duration

	// EnableCompression enables data compression during transport
	EnableCompression bool

	// MetricsCollector for observability hooks (optional)
	MetricsCollector MetricsCollector
}

SyncOptions configures the synchronization behavior

type SyncResult

type SyncResult struct {
	// EventsPushed is the number of events sent to remote
	EventsPushed int

	// EventsPulled is the number of events received from remote
	EventsPulled int

	// ConflictsResolved is the number of conflicts that were resolved
	ConflictsResolved int

	// Errors contains any non-fatal errors that occurred during sync
	Errors []error

	// StartTime is when the sync operation began
	StartTime time.Time

	// Duration is how long the sync took
	Duration time.Duration

	// LocalVersion is the local version after sync
	LocalVersion Version

	// RemoteVersion is the remote version after sync
	RemoteVersion Version
}

SyncResult provides information about a completed sync operation

type TestEvent

type TestEvent struct{}

TestEvent implements Event interface for testing

func (*TestEvent) AggregateID

func (m *TestEvent) AggregateID() string

func (*TestEvent) Data

func (m *TestEvent) Data() interface{}

func (*TestEvent) ID

func (m *TestEvent) ID() string

func (*TestEvent) Metadata

func (m *TestEvent) Metadata() map[string]interface{}

func (*TestEvent) Type

func (m *TestEvent) Type() string

type TestEventStore

type TestEventStore struct {
	// contains filtered or unexported fields
}

TestEventStore implements a simple in-memory event store for testing

func (*TestEventStore) Close

func (m *TestEventStore) Close() error

func (*TestEventStore) LatestVersion

func (m *TestEventStore) LatestVersion(_ context.Context) (Version, error)

func (*TestEventStore) Load

func (*TestEventStore) LoadByAggregate

func (m *TestEventStore) LoadByAggregate(_ context.Context, _ string, _ Version) ([]EventWithVersion, error)

func (*TestEventStore) ParseVersion

func (m *TestEventStore) ParseVersion(_ context.Context, _ string) (Version, error)

func (*TestEventStore) Store

func (m *TestEventStore) Store(_ context.Context, event Event, version Version) error

type TestTransport

type TestTransport struct {
	// contains filtered or unexported fields
}

TestTransport implements a simple transport for testing

func (*TestTransport) Close

func (m *TestTransport) Close() error

func (*TestTransport) GetLatestVersion

func (m *TestTransport) GetLatestVersion(ctx context.Context) (Version, error)

func (*TestTransport) Pull

func (m *TestTransport) Pull(ctx context.Context, since Version) ([]EventWithVersion, error)

func (*TestTransport) Push

func (m *TestTransport) Push(ctx context.Context, events []EventWithVersion) error

func (*TestTransport) Subscribe

func (m *TestTransport) Subscribe(_ context.Context, _ func([]EventWithVersion) error) error

type TestVersion

type TestVersion struct{}

TestVersion implements Version interface for testing

func (*TestVersion) Compare

func (m *TestVersion) Compare(_ Version) int

func (*TestVersion) IsZero

func (m *TestVersion) IsZero() bool

func (*TestVersion) String

func (m *TestVersion) String() string

type Transport

type Transport interface {
	// Push sends events to the remote endpoint
	Push(ctx context.Context, events []EventWithVersion) error

	// Pull retrieves events from the remote endpoint since the given version
	Pull(ctx context.Context, since Version) ([]EventWithVersion, error)

	// GetLatestVersion efficiently retrieves the latest version from remote without pulling events
	GetLatestVersion(ctx context.Context) (Version, error)

	// Subscribe listens for real-time updates (optional for polling-based transports)
	Subscribe(ctx context.Context, handler func([]EventWithVersion) error) error

	// Close closes the transport connection
	Close() error
}

Transport handles the actual network communication between clients and servers. Implementations can use HTTP, gRPC, WebSockets, NATS, etc.

type Version

type Version = interfaces.Version

Version represents a point-in-time snapshot for sync operations. Users can implement different versioning strategies (timestamps, hashes, vector clocks).

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL