Documentation
¶
Overview ¶
Package sync provides a generic event-driven synchronization system for distributed applications. It supports offline-first architectures with conflict resolution and pluggable storage backends.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConflictResolver ¶
type ConflictResolver interface {
// Resolve handles a conflict between local and remote events
// Returns the resolved events to keep
Resolve(ctx context.Context, local, remote []EventWithVersion) ([]EventWithVersion, error)
}
ConflictResolver handles conflicts when the same data is modified concurrently. Different strategies can be plugged in (last-write-wins, merge, user-prompt, etc.).
type CursorTransport ¶
type CursorTransport interface {
Transport
// PullWithCursor retrieves events using a cursor-based pagination strategy.
// Returns events, next cursor, and any error.
PullWithCursor(ctx context.Context, since cursor.Cursor, limit int) ([]EventWithVersion, cursor.Cursor, error)
}
CursorTransport extends Transport with cursor-based capabilities.
type Event ¶
type Event interface {
// ID returns a unique identifier for this event
ID() string
// Type returns the event type (e.g., "UserCreated", "OrderUpdated")
Type() string
// AggregateID returns the ID of the aggregate this event belongs to
AggregateID() string
// Data returns the event payload
Data() interface{}
// Metadata returns additional event metadata
Metadata() map[string]interface{}
}
Event represents This interface should be implemented by user's event types.
type EventStore ¶
type EventStore interface {
// Store persists an event to the store
Store(ctx context.Context, event Event, version Version) error
// Load retrieves all events since the given version
Load(ctx context.Context, since Version) ([]EventWithVersion, error)
// LoadByAggregate retrieves events for a specific aggregate since the given version
LoadByAggregate(ctx context.Context, aggregateID string, since Version) ([]EventWithVersion, error)
// LatestVersion returns the latest version in the store
LatestVersion(ctx context.Context) (Version, error)
// ParseVersion converts a string representation into a Version
// This allows different storage implementations to handle their own version formats
ParseVersion(ctx context.Context, versionStr string) (Version, error)
// Close closes the store and releases resources
Close() error
}
EventStore provides persistence for events. Implementations can use any storage backend (SQLite, BadgerDB, PostgreSQL, etc.).
type EventWithVersion ¶
EventWithVersion pairs an event with its version information
type MetricsCollector ¶
type MetricsCollector interface {
// RecordSyncDuration records how long a sync operation took
RecordSyncDuration(op string, d time.Duration)
// RecordSyncEvents records how many events were synced
RecordSyncEvents(pushed, pulled int)
// RecordConflicts records how many conflicts were resolved
RecordConflicts(count int)
// RecordSyncErrors records sync operation errors
RecordSyncErrors(op, reason string)
}
MetricsCollector provides hooks for observability.
type NoOpMetricsCollector ¶
type NoOpMetricsCollector struct{}
NoOpMetricsCollector is a stub implementation that discards metrics.
func (*NoOpMetricsCollector) RecordConflicts ¶
func (*NoOpMetricsCollector) RecordConflicts(count int)
func (*NoOpMetricsCollector) RecordSyncDuration ¶
func (*NoOpMetricsCollector) RecordSyncDuration(op string, d time.Duration)
func (*NoOpMetricsCollector) RecordSyncErrors ¶
func (*NoOpMetricsCollector) RecordSyncErrors(op, reason string)
func (*NoOpMetricsCollector) RecordSyncEvents ¶
func (*NoOpMetricsCollector) RecordSyncEvents(pushed, pulled int)
type RetryConfig ¶
type RetryConfig struct {
// MaxAttempts is the maximum number of retry attempts
MaxAttempts int
// InitialDelay is the initial delay between retries
InitialDelay time.Duration
// MaxDelay is the maximum delay between retries
MaxDelay time.Duration
// Multiplier is the factor by which the delay increases
Multiplier float64
}
RetryConfig configures the retry behavior for sync operations
type SyncManager ¶
type SyncManager interface {
// Sync performs a bidirectional sync operation
Sync(ctx context.Context) (*SyncResult, error)
// Push sends local events to remote
Push(ctx context.Context) (*SyncResult, error)
// Pull retrieves remote events to local
Pull(ctx context.Context) (*SyncResult, error)
// StartAutoSync begins automatic synchronization at the configured interval
StartAutoSync(ctx context.Context) error
// StopAutoSync stops automatic synchronization
StopAutoSync() error
// Subscribe to sync events (optional)
Subscribe(handler func(*SyncResult)) error
// Close shuts down the sync manager
Close() error
}
SyncManager coordinates the synchronization process between local and remote stores. This is the main entry point for the sync package.
func NewSyncManager ¶
func NewSyncManager(store EventStore, transport Transport, opts *SyncOptions) SyncManager
NewSyncManager creates a new sync manager with the provided components
type SyncOptions ¶
type SyncOptions struct {
// LastCursorLoader loads the last saved cursor for cursor-based syncs
LastCursorLoader func() cursor.Cursor
// CursorSaver saves the latest cursor after a successful sync
CursorSaver func(cursor.Cursor) error
// PushOnly indicates this client should only push events, not pull
PushOnly bool
// PullOnly indicates this client should only pull events, not push
PullOnly bool
// ConflictResolver to use for handling conflicts
ConflictResolver ConflictResolver
// Filter can be used to sync only specific events
Filter func(Event) bool
// BatchSize limits how many events to sync at once
BatchSize int
// SyncInterval for automatic periodic sync (0 disables)
SyncInterval time.Duration
// RetryConfig configures retry behavior for sync operations
RetryConfig *RetryConfig
// EnableValidation enables additional validation checks during sync
EnableValidation bool
// Timeout sets the maximum duration for sync operations
Timeout time.Duration
// EnableCompression enables data compression during transport
EnableCompression bool
// MetricsCollector for observability hooks (optional)
MetricsCollector MetricsCollector
}
SyncOptions configures the synchronization behavior
type SyncResult ¶
type SyncResult struct {
// EventsPushed is the number of events sent to remote
EventsPushed int
// EventsPulled is the number of events received from remote
EventsPulled int
// ConflictsResolved is the number of conflicts that were resolved
ConflictsResolved int
// Errors contains any non-fatal errors that occurred during sync
Errors []error
// StartTime is when the sync operation began
StartTime time.Time
// Duration is how long the sync took
Duration time.Duration
// LocalVersion is the local version after sync
LocalVersion Version
// RemoteVersion is the remote version after sync
RemoteVersion Version
}
SyncResult provides information about a completed sync operation
type Transport ¶
type Transport interface {
// Push sends events to the remote endpoint
Push(ctx context.Context, events []EventWithVersion) error
// Pull retrieves events from the remote endpoint since the given version
Pull(ctx context.Context, since Version) ([]EventWithVersion, error)
// GetLatestVersion efficiently retrieves the latest version from remote without pulling events
GetLatestVersion(ctx context.Context) (Version, error)
// Subscribe listens for real-time updates (optional for polling-based transports)
Subscribe(ctx context.Context, handler func([]EventWithVersion) error) error
// Close closes the transport connection
Close() error
}
Transport handles the actual network communication between clients and servers. Implementations can use HTTP, gRPC, WebSockets, NATS, etc.
type Version ¶
type Version interface {
// Compare returns -1 if this version is before other, 0 if equal, 1 if after
Compare(other Version) int
// String returns a string representation of the version
String() string
// IsZero returns true if this is the zero/initial version
IsZero() bool
}
Version represents a point-in-time snapshot for sync operations. Users can implement different versioning strategies (timestamps, hashes, vector clocks).