Documentation
¶
Overview ¶
Package sync provides a generic event-driven synchronization system for distributed applications. It supports offline-first architectures with conflict resolution and pluggable storage backends.
Index ¶
- type BackoffStrategy
- type ConflictResolver
- type ConnectionStatus
- type CursorTransport
- type Event
- type EventStore
- type EventWithVersion
- type ExponentialBackoff
- type MetricsCollector
- type MockMetricsCollector
- func (m *MockMetricsCollector) RecordConflicts(resolved int)
- func (m *MockMetricsCollector) RecordSyncDuration(operation string, duration time.Duration)
- func (m *MockMetricsCollector) RecordSyncErrors(operation string, errorType string)
- func (m *MockMetricsCollector) RecordSyncEvents(pushed, pulled int)
- type NoOpMetricsCollector
- func (n *NoOpMetricsCollector) RecordConflicts(resolved int)
- func (n *NoOpMetricsCollector) RecordSyncDuration(operation string, duration time.Duration)
- func (n *NoOpMetricsCollector) RecordSyncErrors(operation string, errorType string)
- func (n *NoOpMetricsCollector) RecordSyncEvents(pushed, pulled int)
- type Notification
- type NotificationFilter
- type NotificationHandler
- type RealtimeNotifier
- type RealtimeSyncManager
- type RealtimeSyncOptions
- type RetryConfig
- type SyncManager
- type SyncManagerBuilder
- func (b *SyncManagerBuilder) Build() (SyncManager, error)
- func (b *SyncManagerBuilder) Reset() *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithBatchSize(size int) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithCompression(enabled bool) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithConflictResolver(resolver ConflictResolver) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithFilter(filter func(Event) bool) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithLogger(logger *slog.Logger) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithPullOnly() *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithPushOnly() *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithStore(store EventStore) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithSyncInterval(interval time.Duration) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithTimeout(timeout time.Duration) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithTransport(transport Transport) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithValidation() *SyncManagerBuilder
- type SyncOptions
- type SyncResult
- type TestEvent
- type TestEventStore
- func (m *TestEventStore) Close() error
- func (m *TestEventStore) LatestVersion(_ context.Context) (Version, error)
- func (m *TestEventStore) Load(_ context.Context, _ Version) ([]EventWithVersion, error)
- func (m *TestEventStore) LoadByAggregate(_ context.Context, _ string, _ Version) ([]EventWithVersion, error)
- func (m *TestEventStore) ParseVersion(_ context.Context, _ string) (Version, error)
- func (m *TestEventStore) Store(_ context.Context, event Event, version Version) error
- type TestTransport
- func (m *TestTransport) Close() error
- func (m *TestTransport) GetLatestVersion(ctx context.Context) (Version, error)
- func (m *TestTransport) Pull(ctx context.Context, since Version) ([]EventWithVersion, error)
- func (m *TestTransport) Push(ctx context.Context, events []EventWithVersion) error
- func (m *TestTransport) Subscribe(_ context.Context, _ func([]EventWithVersion) error) error
- type TestVersion
- type Transport
- type Version
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BackoffStrategy ¶
type BackoffStrategy interface {
// NextDelay returns the delay before the next reconnection attempt
NextDelay(attempt int) time.Duration
// Reset resets the backoff strategy after a successful connection
Reset()
}
BackoffStrategy defines how to handle reconnection delays
type ConflictResolver ¶
type ConflictResolver interface {
// Resolve handles a conflict between local and remote events
// Returns the resolved events to keep
Resolve(ctx context.Context, local, remote []EventWithVersion) ([]EventWithVersion, error)
}
ConflictResolver handles conflicts when the same data is modified concurrently. Different strategies can be plugged in (last-write-wins, merge, user-prompt, etc.).
type ConnectionStatus ¶
type ConnectionStatus struct {
Connected bool
LastConnected time.Time
ReconnectAttempts int
Error error
}
ConnectionStatus represents the state of the real-time connection
type CursorTransport ¶ added in v0.8.0
type CursorTransport interface {
Transport
// PullWithCursor retrieves events using a cursor-based pagination strategy.
// Returns events, next cursor, and any error.
PullWithCursor(ctx context.Context, since cursor.Cursor, limit int) ([]EventWithVersion, cursor.Cursor, error)
}
CursorTransport extends Transport with cursor-based capabilities.
type Event ¶
type Event interface {
// ID returns a unique identifier for this event
ID() string
// Type returns the event type (e.g., "UserCreated", "OrderUpdated")
Type() string
// AggregateID returns the ID of the aggregate this event belongs to
AggregateID() string
// Data returns the event payload
Data() interface{}
// Metadata returns additional event metadata
Metadata() map[string]interface{}
}
Event represents a syncable event in the system. This interface should be implemented by user's event types.
type EventStore ¶
type EventStore interface {
// Store persists an event to the store
Store(ctx context.Context, event Event, version Version) error
// Load retrieves all events since the given version
Load(ctx context.Context, since Version) ([]EventWithVersion, error)
// LoadByAggregate retrieves events for a specific aggregate since the given version
LoadByAggregate(ctx context.Context, aggregateID string, since Version) ([]EventWithVersion, error)
// LatestVersion returns the latest version in the store
LatestVersion(ctx context.Context) (Version, error)
// ParseVersion converts a string representation into a Version
// This allows different storage implementations to handle their own version formats
ParseVersion(ctx context.Context, versionStr string) (Version, error)
// Close closes the store and releases resources
Close() error
}
EventStore provides persistence for events. Implementations can use any storage backend (SQLite, BadgerDB, PostgreSQL, etc.).
type EventWithVersion ¶
EventWithVersion pairs an event with its version information
type ExponentialBackoff ¶
type ExponentialBackoff struct {
InitialDelay time.Duration
MaxDelay time.Duration
Multiplier float64
}
ExponentialBackoff implements exponential backoff with jitter
func (*ExponentialBackoff) NextDelay ¶
func (eb *ExponentialBackoff) NextDelay(attempt int) time.Duration
func (*ExponentialBackoff) Reset ¶
func (eb *ExponentialBackoff) Reset()
type MetricsCollector ¶
type MetricsCollector interface {
// RecordSyncDuration records how long a sync operation took
RecordSyncDuration(operation string, duration time.Duration)
// RecordSyncEvents records the number of events pushed and pulled
RecordSyncEvents(pushed, pulled int)
// RecordSyncErrors records sync operation errors by type
RecordSyncErrors(operation string, errorType string)
// RecordConflicts records the number of conflicts resolved
RecordConflicts(resolved int)
}
MetricsCollector provides hooks for collecting sync operation metrics
type MockMetricsCollector ¶
type MockMetricsCollector struct {
DurationCalls []struct {
Operation string
Duration time.Duration
}
EventCalls []struct {
Pushed, Pulled int
}
ErrorCalls []struct {
Operation, ErrorType string
}
ConflictCalls []struct {
Resolved int
}
}
MockMetricsCollector implements MetricsCollector interface for testing
func (*MockMetricsCollector) RecordConflicts ¶
func (m *MockMetricsCollector) RecordConflicts(resolved int)
func (*MockMetricsCollector) RecordSyncDuration ¶
func (m *MockMetricsCollector) RecordSyncDuration(operation string, duration time.Duration)
func (*MockMetricsCollector) RecordSyncErrors ¶
func (m *MockMetricsCollector) RecordSyncErrors(operation string, errorType string)
func (*MockMetricsCollector) RecordSyncEvents ¶
func (m *MockMetricsCollector) RecordSyncEvents(pushed, pulled int)
type NoOpMetricsCollector ¶
type NoOpMetricsCollector struct{}
NoOpMetricsCollector is a default implementation that does nothing
func (*NoOpMetricsCollector) RecordConflicts ¶
func (n *NoOpMetricsCollector) RecordConflicts(resolved int)
func (*NoOpMetricsCollector) RecordSyncDuration ¶
func (n *NoOpMetricsCollector) RecordSyncDuration(operation string, duration time.Duration)
func (*NoOpMetricsCollector) RecordSyncErrors ¶
func (n *NoOpMetricsCollector) RecordSyncErrors(operation string, errorType string)
func (*NoOpMetricsCollector) RecordSyncEvents ¶
func (n *NoOpMetricsCollector) RecordSyncEvents(pushed, pulled int)
type Notification ¶
type Notification struct {
// Type of notification (data_changed, sync_requested, etc.)
Type string
// Source identifies where the change originated
Source string
// AggregateIDs that were affected (optional, for filtering)
AggregateIDs []string
// Metadata for additional context
Metadata map[string]interface{}
// Timestamp when the notification was created
Timestamp time.Time
}
Notification represents a real-time update notification
type NotificationFilter ¶
type NotificationFilter func(notification Notification) bool
NotificationFilter allows filtering which notifications to process
type NotificationHandler ¶
type NotificationHandler func(notification Notification) error
NotificationHandler processes incoming real-time notifications
type RealtimeNotifier ¶
type RealtimeNotifier interface {
// Subscribe starts listening for real-time notifications
// The handler will be called when new data is available for sync
Subscribe(ctx context.Context, handler NotificationHandler) error
// Unsubscribe stops listening for notifications
Unsubscribe() error
// Notify sends a notification to connected clients (server-side)
Notify(ctx context.Context, notification Notification) error
// IsConnected returns true if the real-time connection is active
IsConnected() bool
// Close closes the notifier connection
Close() error
}
RealtimeNotifier provides real-time push notifications when data changes. This is separate from Transport to allow for different notification mechanisms (WebSockets, SSE, NATS, etc.) while keeping the sync logic agnostic.
type RealtimeSyncManager ¶
type RealtimeSyncManager interface {
SyncManager
// EnableRealtime starts real-time notifications
EnableRealtime(ctx context.Context) error
// DisableRealtime stops real-time notifications and falls back to polling
DisableRealtime() error
// IsRealtimeActive returns true if real-time notifications are active
IsRealtimeActive() bool
// GetConnectionStatus returns the current real-time connection status
GetConnectionStatus() ConnectionStatus
}
RealtimeSyncManager extends SyncManager with real-time capabilities
func NewRealtimeSyncManager ¶
func NewRealtimeSyncManager(store EventStore, transport Transport, options *RealtimeSyncOptions) RealtimeSyncManager
NewRealtimeSyncManager creates a sync manager with real-time capabilities
type RealtimeSyncOptions ¶
type RealtimeSyncOptions struct {
SyncOptions
// RealtimeNotifier for push notifications (optional)
RealtimeNotifier RealtimeNotifier
// NotificationFilter to process only relevant notifications
NotificationFilter NotificationFilter
// DisablePolling stops periodic sync when real-time is active
DisablePolling bool
// FallbackInterval for polling when real-time connection is lost
FallbackInterval time.Duration
// ReconnectBackoff strategy for reconnecting to real-time notifications
ReconnectBackoff BackoffStrategy
}
RealtimeSyncOptions extends SyncOptions with real-time capabilities
type RetryConfig ¶
type RetryConfig struct {
// MaxAttempts is the maximum number of retry attempts
MaxAttempts int
// InitialDelay is the initial delay between retries
InitialDelay time.Duration
// MaxDelay is the maximum delay between retries
MaxDelay time.Duration
// Multiplier is the factor by which the delay increases
Multiplier float64
}
RetryConfig configures the retry behavior for sync operations
type SyncManager ¶
type SyncManager interface {
// Sync performs a bidirectional sync operation
Sync(ctx context.Context) (*SyncResult, error)
// Push sends local events to remote
Push(ctx context.Context) (*SyncResult, error)
// Pull retrieves remote events to local
Pull(ctx context.Context) (*SyncResult, error)
// StartAutoSync begins automatic synchronization at the configured interval
StartAutoSync(ctx context.Context) error
// StopAutoSync stops automatic synchronization
StopAutoSync() error
// Subscribe to sync events (optional)
Subscribe(handler func(*SyncResult)) error
// Close shuts down the sync manager
Close() error
}
SyncManager coordinates the synchronization process between local and remote stores. This is the main entry point for the sync package.
func NewSyncManager ¶
func NewSyncManager(store EventStore, transport Transport, opts *SyncOptions, logger *slog.Logger) SyncManager
NewSyncManager creates a new sync manager with the provided components
type SyncManagerBuilder ¶
type SyncManagerBuilder struct {
// contains filtered or unexported fields
}
SyncManagerBuilder provides a fluent interface for constructing SyncManager instances.
func NewSyncManagerBuilder ¶
func NewSyncManagerBuilder() *SyncManagerBuilder
NewSyncManagerBuilder creates a new builder with default options.
func (*SyncManagerBuilder) Build ¶
func (b *SyncManagerBuilder) Build() (SyncManager, error)
Build creates a new SyncManager instance with the configured options.
func (*SyncManagerBuilder) Reset ¶
func (b *SyncManagerBuilder) Reset() *SyncManagerBuilder
Reset clears the builder, allowing reuse.
func (*SyncManagerBuilder) WithBatchSize ¶
func (b *SyncManagerBuilder) WithBatchSize(size int) *SyncManagerBuilder
WithBatchSize sets the batch size for sync operations.
func (*SyncManagerBuilder) WithCompression ¶
func (b *SyncManagerBuilder) WithCompression(enabled bool) *SyncManagerBuilder
WithCompression enables data compression during transport.
func (*SyncManagerBuilder) WithConflictResolver ¶
func (b *SyncManagerBuilder) WithConflictResolver(resolver ConflictResolver) *SyncManagerBuilder
WithConflictResolver sets the conflict resolution strategy.
func (*SyncManagerBuilder) WithFilter ¶
func (b *SyncManagerBuilder) WithFilter(filter func(Event) bool) *SyncManagerBuilder
WithFilter sets an event filter function.
func (*SyncManagerBuilder) WithLogger ¶ added in v0.12.0
func (b *SyncManagerBuilder) WithLogger(logger *slog.Logger) *SyncManagerBuilder
WithLogger sets a custom logger for the SyncManager.
func (*SyncManagerBuilder) WithPullOnly ¶
func (b *SyncManagerBuilder) WithPullOnly() *SyncManagerBuilder
WithPullOnly configures the SyncManager to only pull events.
func (*SyncManagerBuilder) WithPushOnly ¶
func (b *SyncManagerBuilder) WithPushOnly() *SyncManagerBuilder
WithPushOnly configures the SyncManager to only push events.
func (*SyncManagerBuilder) WithStore ¶
func (b *SyncManagerBuilder) WithStore(store EventStore) *SyncManagerBuilder
WithStore sets the EventStore for the SyncManager.
func (*SyncManagerBuilder) WithSyncInterval ¶
func (b *SyncManagerBuilder) WithSyncInterval(interval time.Duration) *SyncManagerBuilder
WithSyncInterval sets the interval for automatic synchronization.
func (*SyncManagerBuilder) WithTimeout ¶
func (b *SyncManagerBuilder) WithTimeout(timeout time.Duration) *SyncManagerBuilder
WithTimeout sets the maximum duration for sync operations.
func (*SyncManagerBuilder) WithTransport ¶
func (b *SyncManagerBuilder) WithTransport(transport Transport) *SyncManagerBuilder
WithTransport sets the Transport for the SyncManager.
func (*SyncManagerBuilder) WithValidation ¶
func (b *SyncManagerBuilder) WithValidation() *SyncManagerBuilder
WithValidation enables additional validation checks during sync operations.
type SyncOptions ¶
type SyncOptions struct {
// LastCursorLoader loads the last saved cursor for cursor-based syncs
LastCursorLoader func() cursor.Cursor
// CursorSaver saves the latest cursor after a successful sync
CursorSaver func(cursor.Cursor) error
// PushOnly indicates this client should only push events, not pull
PushOnly bool
// PullOnly indicates this client should only pull events, not push
PullOnly bool
// ConflictResolver to use for handling conflicts
ConflictResolver ConflictResolver
// Filter can be used to sync only specific events
Filter func(Event) bool
// BatchSize limits how many events to sync at once
BatchSize int
// SyncInterval for automatic periodic sync (0 disables)
SyncInterval time.Duration
// RetryConfig configures retry behavior for sync operations
RetryConfig *RetryConfig
// EnableValidation enables additional validation checks during sync
EnableValidation bool
// Timeout sets the maximum duration for sync operations
Timeout time.Duration
// EnableCompression enables data compression during transport
EnableCompression bool
// MetricsCollector for observability hooks (optional)
MetricsCollector MetricsCollector
}
SyncOptions configures the synchronization behavior
type SyncResult ¶
type SyncResult struct {
// EventsPushed is the number of events sent to remote
EventsPushed int
// EventsPulled is the number of events received from remote
EventsPulled int
// ConflictsResolved is the number of conflicts that were resolved
ConflictsResolved int
// Errors contains any non-fatal errors that occurred during sync
Errors []error
// StartTime is when the sync operation began
StartTime time.Time
// Duration is how long the sync took
Duration time.Duration
// LocalVersion is the local version after sync
LocalVersion Version
// RemoteVersion is the remote version after sync
RemoteVersion Version
}
SyncResult provides information about a completed sync operation
type TestEvent ¶
type TestEvent struct{}
TestEvent implements Event interface for testing
func (*TestEvent) AggregateID ¶
type TestEventStore ¶
type TestEventStore struct {
// contains filtered or unexported fields
}
TestEventStore implements a simple in-memory event store for testing
func (*TestEventStore) Close ¶
func (m *TestEventStore) Close() error
func (*TestEventStore) LatestVersion ¶
func (m *TestEventStore) LatestVersion(_ context.Context) (Version, error)
func (*TestEventStore) Load ¶
func (m *TestEventStore) Load(_ context.Context, _ Version) ([]EventWithVersion, error)
func (*TestEventStore) LoadByAggregate ¶
func (m *TestEventStore) LoadByAggregate(_ context.Context, _ string, _ Version) ([]EventWithVersion, error)
func (*TestEventStore) ParseVersion ¶
type TestTransport ¶
type TestTransport struct {
// contains filtered or unexported fields
}
TestTransport implements a simple transport for testing
func (*TestTransport) Close ¶
func (m *TestTransport) Close() error
func (*TestTransport) GetLatestVersion ¶
func (m *TestTransport) GetLatestVersion(ctx context.Context) (Version, error)
func (*TestTransport) Pull ¶
func (m *TestTransport) Pull(ctx context.Context, since Version) ([]EventWithVersion, error)
func (*TestTransport) Push ¶
func (m *TestTransport) Push(ctx context.Context, events []EventWithVersion) error
func (*TestTransport) Subscribe ¶
func (m *TestTransport) Subscribe(_ context.Context, _ func([]EventWithVersion) error) error
type TestVersion ¶
type TestVersion struct{}
TestVersion implements Version interface for testing
func (*TestVersion) Compare ¶
func (m *TestVersion) Compare(_ Version) int
func (*TestVersion) IsZero ¶
func (m *TestVersion) IsZero() bool
func (*TestVersion) String ¶
func (m *TestVersion) String() string
type Transport ¶
type Transport interface {
// Push sends events to the remote endpoint
Push(ctx context.Context, events []EventWithVersion) error
// Pull retrieves events from the remote endpoint since the given version
Pull(ctx context.Context, since Version) ([]EventWithVersion, error)
// GetLatestVersion efficiently retrieves the latest version from remote without pulling events
GetLatestVersion(ctx context.Context) (Version, error)
// Subscribe listens for real-time updates (optional for polling-based transports)
Subscribe(ctx context.Context, handler func([]EventWithVersion) error) error
// Close closes the transport connection
Close() error
}
Transport handles the actual network communication between clients and servers. Implementations can use HTTP, gRPC, WebSockets, NATS, etc.
type Version ¶
type Version = interfaces.Version
Version represents a point-in-time snapshot for sync operations. Users can implement different versioning strategies (timestamps, hashes, vector clocks).