Documentation
¶
Overview ¶
Package synckit - aliases for backward compatibility and future expansion
Package sync provides a generic event-driven synchronization system for distributed applications. It supports offline-first architectures with conflict resolution and pluggable storage backends.
Index ¶
- type AdditiveMergeResolver
- type AuditableOption
- type AuditableResolver
- type BackoffStrategy
- type BasicValidator
- type CompositeOption
- type CompositeResolver
- type ConfigLoader
- type ConfigLoaderOption
- type ConfigSettings
- type ConfigTransformer
- type ConfigValidator
- type ConfigWatcher
- type Conflict
- type ConflictResolver
- type ConflictState
- type ConnectionStatus
- type CursorTransport
- type DynamicResolver
- type Event
- type EventStore
- type EventWithVersion
- type ExponentialBackoff
- type ExtendedMetricsCollector
- type ExtendedNoOpMetricsCollector
- func (c *ExtendedNoOpMetricsCollector) RecordFallbackUsage(group string)
- func (c *ExtendedNoOpMetricsCollector) RecordManualReview(reason string)
- func (c *ExtendedNoOpMetricsCollector) RecordResolution(rule, group, decision string, dur time.Duration, success bool)
- func (c *ExtendedNoOpMetricsCollector) RecordResolutionError(rule, group, errType string)
- func (c *ExtendedNoOpMetricsCollector) RecordRuleMatch(rule, group string)
- type FirstWriteWinsResolver
- type GroupConfig
- type Hooks
- type InMemoryMementoCaretaker
- func (c *InMemoryMementoCaretaker) Delete(ctx context.Context, id string) error
- func (c *InMemoryMementoCaretaker) Get(ctx context.Context, id string) (*ResolutionMemento, error)
- func (c *InMemoryMementoCaretaker) GetAuditTrail(ctx context.Context, aggregateID string) ([]*ResolutionMemento, error)
- func (c *InMemoryMementoCaretaker) List(ctx context.Context, criteria *MementoCriteria) ([]*ResolutionMemento, error)
- func (c *InMemoryMementoCaretaker) Save(ctx context.Context, memento *ResolutionMemento) error
- type LastWriteWinsResolver
- type LimitSettings
- type Logger
- type LoggingWatcher
- type ManagerOption
- func WithAdditiveMerge() ManagerOption
- func WithBatchSize(n int) ManagerOption
- func WithCompression(enabled bool) ManagerOption
- func WithConflictResolver(r ConflictResolver) ManagerOption
- func WithFWW() ManagerOption
- func WithFilter(filter func(Event) bool) ManagerOption
- func WithHTTPTransport(baseURL string) ManagerOption
- func WithLWW() ManagerOption
- func WithManagerLogger(logger *slog.Logger) ManagerOption
- func WithNullTransport() ManagerOption
- func WithPullOnly() ManagerOption
- func WithPushOnly() ManagerOption
- func WithSQLite(path string) ManagerOption
- func WithStore(s EventStore) ManagerOption
- func WithSyncInterval(interval time.Duration) ManagerOption
- func WithTimeout(d time.Duration) ManagerOption
- func WithTransport(t Transport) ManagerOption
- func WithValidation() ManagerOption
- type ManualReviewResolver
- type MatchConditions
- type MementoCaretaker
- type MementoCriteria
- type MetricsCollector
- type MockMetricsCollector
- func (m *MockMetricsCollector) RecordConflicts(resolved int)
- func (m *MockMetricsCollector) RecordSyncDuration(operation string, duration time.Duration)
- func (m *MockMetricsCollector) RecordSyncErrors(operation string, errorType string)
- func (m *MockMetricsCollector) RecordSyncEvents(pushed, pulled int)
- type NoOpMetricsCollector
- func (n *NoOpMetricsCollector) RecordConflicts(resolved int)
- func (n *NoOpMetricsCollector) RecordSyncDuration(operation string, duration time.Duration)
- func (n *NoOpMetricsCollector) RecordSyncErrors(operation string, errorType string)
- func (n *NoOpMetricsCollector) RecordSyncEvents(pushed, pulled int)
- type Notification
- type NotificationFilter
- type NotificationHandler
- type ObservableOption
- type ObservableResolver
- type Option
- func WithEventTypeRule(name string, eventType string, resolver ConflictResolver) Option
- func WithFallback(r ConflictResolver) Option
- func WithHooks(h Hooks) Option
- func WithLogger(l any) Option
- func WithRule(name string, matcher Spec, resolver ConflictResolver) Option
- func WithValidator(v Validator) Option
- type RealtimeNotifier
- type RealtimeSyncManager
- type RealtimeSyncOptions
- type ResolutionConfig
- type ResolutionHooks
- type ResolutionMemento
- type ResolvedConflict
- type RetryConfig
- type RollbackAnalysis
- type RollbackCapability
- type Rule
- type RuleConfig
- type RuleConfigEntry
- type RuleGroup
- func (rg *RuleGroup) AddRule(rule Rule) *RuleGroup
- func (rg *RuleGroup) AddSubGroup(subGroup *RuleGroup) *RuleGroup
- func (rg *RuleGroup) Disable() *RuleGroup
- func (rg *RuleGroup) Enable() *RuleGroup
- func (rg *RuleGroup) GetAllRules() []Rule
- func (rg *RuleGroup) GetStats() RuleGroupStats
- func (rg *RuleGroup) IsEnabled() bool
- func (rg *RuleGroup) ResolveInGroup(ctx context.Context, conflict Conflict) (ResolvedConflict, bool, error)
- type RuleGroupOption
- type RuleGroupStats
- type Spec
- func AggregateIDMatches(pattern string) Spec
- func AlwaysMatch() Spec
- func And(a, b Spec) Spec
- func AndMatcher(specs ...Spec) Spec
- func AnyFieldIn(fields ...string) Spec
- func EventTypeIs(t string) Spec
- func FieldChanged(field string) Spec
- func MetadataEq(key string, value any) Spec
- func Not(a Spec) Spec
- func Or(a, b Spec) Spec
- type SyncManager
- type SyncManagerBuilder
- func (b *SyncManagerBuilder) Build() (SyncManager, error)
- func (b *SyncManagerBuilder) Reset() *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithBatchSize(size int) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithCompression(enabled bool) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithConflictResolver(resolver ConflictResolver) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithFilter(filter func(Event) bool) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithLogger(logger *slog.Logger) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithPullOnly() *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithPushOnly() *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithStore(store EventStore) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithSyncInterval(interval time.Duration) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithTimeout(timeout time.Duration) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithTransport(transport Transport) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithValidation() *SyncManagerBuilder
- type SyncOptions
- type SyncResult
- type TestEvent
- type TestEventStore
- func (m *TestEventStore) Close() error
- func (m *TestEventStore) LatestVersion(_ context.Context) (Version, error)
- func (m *TestEventStore) Load(_ context.Context, _ Version) ([]EventWithVersion, error)
- func (m *TestEventStore) LoadByAggregate(_ context.Context, _ string, _ Version) ([]EventWithVersion, error)
- func (m *TestEventStore) ParseVersion(_ context.Context, _ string) (Version, error)
- func (m *TestEventStore) Store(_ context.Context, event Event, version Version) error
- type TestTransport
- func (m *TestTransport) Close() error
- func (m *TestTransport) GetLatestVersion(ctx context.Context) (Version, error)
- func (m *TestTransport) Pull(ctx context.Context, since Version) ([]EventWithVersion, error)
- func (m *TestTransport) Push(ctx context.Context, events []EventWithVersion) error
- func (m *TestTransport) Subscribe(_ context.Context, _ func([]EventWithVersion) error) error
- type TestVersion
- type TimeoutSettings
- type Transport
- type Validator
- type Version
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AdditiveMergeResolver ¶ added in v0.15.0
type AdditiveMergeResolver struct{}
func (*AdditiveMergeResolver) Resolve ¶ added in v0.15.0
func (r *AdditiveMergeResolver) Resolve(ctx context.Context, c Conflict) (ResolvedConflict, error)
type AuditableOption ¶ added in v0.15.0
type AuditableOption interface {
// contains filtered or unexported methods
}
AuditableOption provides configuration options for AuditableResolver.
func WithAuditLogger ¶ added in v0.15.0
func WithAuditLogger(logger Logger) AuditableOption
WithAuditLogger sets a logger for the auditable resolver.
func WithOriginExtractor ¶ added in v0.15.0
func WithOriginExtractor(extractor func(context.Context) string) AuditableOption
WithOriginExtractor sets a function to extract origin information from context.
func WithSessionIDExtractor ¶ added in v0.15.0
func WithSessionIDExtractor(extractor func(context.Context) string) AuditableOption
WithSessionIDExtractor sets a function to extract session ID from context.
func WithUserIDExtractor ¶ added in v0.15.0
func WithUserIDExtractor(extractor func(context.Context) string) AuditableOption
WithUserIDExtractor sets a function to extract user ID from context.
type AuditableResolver ¶ added in v0.15.0
type AuditableResolver struct {
// contains filtered or unexported fields
}
AuditableResolver wraps any ConflictResolver to provide audit trail capabilities.
func NewAuditableResolver ¶ added in v0.15.0
func NewAuditableResolver(resolver ConflictResolver, caretaker MementoCaretaker, opts ...AuditableOption) *AuditableResolver
NewAuditableResolver creates a new auditable resolver that wraps an existing resolver.
func (*AuditableResolver) GetAuditTrail ¶ added in v0.15.0
func (ar *AuditableResolver) GetAuditTrail(ctx context.Context, aggregateID string) ([]*ResolutionMemento, error)
GetAuditTrail retrieves the audit trail for a specific aggregate.
func (*AuditableResolver) Resolve ¶ added in v0.15.0
func (ar *AuditableResolver) Resolve(ctx context.Context, conflict Conflict) (ResolvedConflict, error)
Resolve implements ConflictResolver interface with audit trail creation.
type BackoffStrategy ¶
type BackoffStrategy interface {
// NextDelay returns the delay before the next reconnection attempt
NextDelay(attempt int) time.Duration
// Reset resets the backoff strategy after a successful connection
Reset()
}
BackoffStrategy defines how to handle reconnection delays
type BasicValidator ¶ added in v0.15.0
type BasicValidator struct{}
BasicValidator provides basic configuration validation.
func (*BasicValidator) Name ¶ added in v0.15.0
func (v *BasicValidator) Name() string
func (*BasicValidator) Validate ¶ added in v0.15.0
func (v *BasicValidator) Validate(config *RuleConfig) error
type CompositeOption ¶ added in v0.15.0
type CompositeOption interface {
// contains filtered or unexported methods
}
CompositeOption provides configuration options for CompositeResolver.
func WithCompositeLogger ¶ added in v0.15.0
func WithCompositeLogger(logger Logger) CompositeOption
WithCompositeLogger sets a logger for the composite resolver.
type CompositeResolver ¶ added in v0.15.0
type CompositeResolver struct {
// contains filtered or unexported fields
}
CompositeResolver implements ConflictResolver and acts as a root container for multiple rule groups with group-aware resolution.
func NewCompositeResolver ¶ added in v0.15.0
func NewCompositeResolver(fallback ConflictResolver, opts ...CompositeOption) *CompositeResolver
NewCompositeResolver creates a new composite resolver that manages multiple rule groups.
func (*CompositeResolver) AddGroup ¶ added in v0.15.0
func (cr *CompositeResolver) AddGroup(group *RuleGroup) *CompositeResolver
AddGroup adds a rule group to the composite resolver.
func (*CompositeResolver) GetAllGroups ¶ added in v0.15.0
func (cr *CompositeResolver) GetAllGroups() []*RuleGroup
GetAllGroups returns all rule groups managed by this composite resolver.
func (*CompositeResolver) GetGroupStats ¶ added in v0.15.0
func (cr *CompositeResolver) GetGroupStats() []RuleGroupStats
GetGroupStats returns statistics for all managed rule groups.
func (*CompositeResolver) Resolve ¶ added in v0.15.0
func (cr *CompositeResolver) Resolve(ctx context.Context, conflict Conflict) (ResolvedConflict, error)
Resolve implements ConflictResolver interface for CompositeResolver. It tries each group in order, then falls back to the global fallback.
type ConfigLoader ¶ added in v0.15.0
type ConfigLoader struct {
// contains filtered or unexported fields
}
ConfigLoader provides dynamic loading and validation of conflict resolution rules from YAML or JSON configuration files with runtime update capabilities.
func NewConfigLoader ¶ added in v0.15.0
func NewConfigLoader(opts ...ConfigLoaderOption) *ConfigLoader
NewConfigLoader creates a new configuration loader.
func (*ConfigLoader) BuildDynamicResolver ¶ added in v0.15.0
func (cl *ConfigLoader) BuildDynamicResolver() (*DynamicResolver, error)
BuildDynamicResolver creates a DynamicResolver from the current configuration.
func (*ConfigLoader) GetCurrentConfig ¶ added in v0.15.0
func (cl *ConfigLoader) GetCurrentConfig() *RuleConfig
GetCurrentConfig returns the current configuration.
func (*ConfigLoader) LoadFromBytes ¶ added in v0.15.0
func (cl *ConfigLoader) LoadFromBytes(data []byte, format string) error
LoadFromBytes loads configuration from raw bytes.
func (*ConfigLoader) LoadFromFile ¶ added in v0.15.0
func (cl *ConfigLoader) LoadFromFile(filepath string) error
LoadFromFile loads configuration from a YAML or JSON file.
type ConfigLoaderOption ¶ added in v0.15.0
type ConfigLoaderOption interface {
// contains filtered or unexported methods
}
ConfigLoaderOption provides configuration options for ConfigLoader.
func WithConfigLogger ¶ added in v0.15.0
func WithConfigLogger(logger Logger) ConfigLoaderOption
WithConfigLogger sets a logger for the config loader.
func WithConfigValidator ¶ added in v0.15.0
func WithConfigValidator(validator ConfigValidator) ConfigLoaderOption
WithConfigValidator adds a configuration validator.
func WithTransformer ¶ added in v0.15.0
func WithTransformer(transformer ConfigTransformer) ConfigLoaderOption
WithTransformer adds a configuration transformer.
func WithWatcher ¶ added in v0.15.0
func WithWatcher(watcher ConfigWatcher) ConfigLoaderOption
WithWatcher adds a configuration change watcher.
type ConfigSettings ¶ added in v0.15.0
type ConfigSettings struct {
DefaultFallback string `json:"default_fallback,omitempty" yaml:"default_fallback,omitempty"`
Timeouts TimeoutSettings `json:"timeouts,omitempty" yaml:"timeouts,omitempty"`
Limits LimitSettings `json:"limits,omitempty" yaml:"limits,omitempty"`
Features map[string]bool `json:"features,omitempty" yaml:"features,omitempty"`
Custom map[string]string `json:"custom,omitempty" yaml:"custom,omitempty"`
}
ConfigSettings contains global configuration settings.
type ConfigTransformer ¶ added in v0.15.0
type ConfigTransformer interface {
Transform(config *RuleConfig) (*RuleConfig, error)
Name() string
}
ConfigTransformer allows modification of configuration during loading.
type ConfigValidator ¶ added in v0.15.0
type ConfigValidator interface {
Validate(config *RuleConfig) error
Name() string
}
ConfigValidator validates configuration before applying it.
type ConfigWatcher ¶ added in v0.15.0
type ConfigWatcher interface {
OnConfigChanged(oldConfig, newConfig *RuleConfig)
OnConfigError(err error)
Name() string
}
ConfigWatcher monitors configuration changes.
type Conflict ¶ added in v0.15.0
type Conflict struct {
EventType string
AggregateID string
ChangedFields []string
Metadata map[string]any
Local EventWithVersion
Remote EventWithVersion
}
Conflict carries the context needed to resolve a detected conflict between local and remote changes. Domain-agnostic by design.
type ConflictResolver ¶
type ConflictResolver interface {
Resolve(ctx context.Context, c Conflict) (ResolvedConflict, error)
}
ConflictResolver is the Strategy interface for conflict resolution.
type ConflictState ¶ added in v0.15.0
type ConflictState struct {
AggregateID string `json:"aggregate_id"`
EventType string `json:"event_type,omitempty"`
LocalVersion string `json:"local_version"`
RemoteVersion string `json:"remote_version"`
LocalData any `json:"local_data,omitempty"`
RemoteData any `json:"remote_data,omitempty"`
ResolvedData any `json:"resolved_data,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
ConflictState captures the state of entities involved in a conflict. This struct is designed to be JSON serializable by storing only the data payloads rather than the full Event interfaces.
type ConnectionStatus ¶
type ConnectionStatus struct {
Connected bool
LastConnected time.Time
ReconnectAttempts int
Error error
}
ConnectionStatus represents the state of the real-time connection
type CursorTransport ¶ added in v0.8.0
type CursorTransport interface {
Transport
// PullWithCursor retrieves events using a cursor-based pagination strategy.
// Returns events, next cursor, and any error.
PullWithCursor(ctx context.Context, since cursor.Cursor, limit int) ([]EventWithVersion, cursor.Cursor, error)
}
CursorTransport extends Transport with cursor-based capabilities.
type DynamicResolver ¶ added in v0.15.0
type DynamicResolver struct {
// contains filtered or unexported fields
}
DynamicResolver dispatches conflicts to strategies based on an ordered rule set. If no rule matches, it uses the fallback resolver. If no fallback is configured, Resolve returns an error.
func NewDynamicResolver ¶ added in v0.15.0
func NewDynamicResolver(opts ...Option) (*DynamicResolver, error)
NewDynamicResolver constructs a DynamicResolver with validation. Invariants: - At least one rule OR a non-nil fallback must be provided - No rule may have a nil matcher or resolver - If provided, Validator must approve the configuration
func (*DynamicResolver) Resolve ¶ added in v0.15.0
func (d *DynamicResolver) Resolve(ctx context.Context, c Conflict) (ResolvedConflict, error)
Resolve implements the ConflictResolver interface using first-match-wins over the ordered rules, else delegates to fallback.
type Event ¶
Event represents a syncable event in the system. This interface should be implemented by user's event types.
type EventStore ¶
type EventStore interface {
// Store persists an event to the store
Store(ctx context.Context, event Event, version Version) error
// Load retrieves all events since the given version
Load(ctx context.Context, since Version) ([]EventWithVersion, error)
// LoadByAggregate retrieves events for a specific aggregate since the given version
LoadByAggregate(ctx context.Context, aggregateID string, since Version) ([]EventWithVersion, error)
// LatestVersion returns the latest version in the store
LatestVersion(ctx context.Context) (Version, error)
// ParseVersion converts a string representation into a Version
// This allows different storage implementations to handle their own version formats
ParseVersion(ctx context.Context, versionStr string) (Version, error)
// Close closes the store and releases resources
Close() error
}
EventStore provides persistence for events. Implementations can use any storage backend (SQLite, BadgerDB, PostgreSQL, etc.).
type EventWithVersion ¶
type EventWithVersion = types.EventWithVersion
EventWithVersion pairs an event with its version information
type ExponentialBackoff ¶
type ExponentialBackoff struct {
InitialDelay time.Duration
MaxDelay time.Duration
Multiplier float64
}
ExponentialBackoff implements exponential backoff with jitter
func (*ExponentialBackoff) NextDelay ¶
func (eb *ExponentialBackoff) NextDelay(attempt int) time.Duration
func (*ExponentialBackoff) Reset ¶
func (eb *ExponentialBackoff) Reset()
type ExtendedMetricsCollector ¶ added in v0.15.0
type ExtendedMetricsCollector interface {
MetricsCollector // Embed existing interface
// New methods for detailed conflict resolution metrics
RecordResolution(ruleName, groupName, decision string, duration time.Duration, success bool)
RecordRuleMatch(ruleName, groupName string)
RecordFallbackUsage(groupName string)
RecordResolutionError(ruleName, groupName, errorType string)
RecordManualReview(reason string)
}
ExtendedMetricsCollector extends the basic MetricsCollector with detailed resolution metrics.
type ExtendedNoOpMetricsCollector ¶ added in v0.15.0
type ExtendedNoOpMetricsCollector struct {
NoOpMetricsCollector // Embed existing no-op collector
}
ExtendedNoOpMetricsCollector extends NoOpMetricsCollector with extended methods.
func (*ExtendedNoOpMetricsCollector) RecordFallbackUsage ¶ added in v0.15.0
func (c *ExtendedNoOpMetricsCollector) RecordFallbackUsage(group string)
func (*ExtendedNoOpMetricsCollector) RecordManualReview ¶ added in v0.15.0
func (c *ExtendedNoOpMetricsCollector) RecordManualReview(reason string)
func (*ExtendedNoOpMetricsCollector) RecordResolution ¶ added in v0.15.0
func (c *ExtendedNoOpMetricsCollector) RecordResolution(rule, group, decision string, dur time.Duration, success bool)
func (*ExtendedNoOpMetricsCollector) RecordResolutionError ¶ added in v0.15.0
func (c *ExtendedNoOpMetricsCollector) RecordResolutionError(rule, group, errType string)
func (*ExtendedNoOpMetricsCollector) RecordRuleMatch ¶ added in v0.15.0
func (c *ExtendedNoOpMetricsCollector) RecordRuleMatch(rule, group string)
type FirstWriteWinsResolver ¶ added in v0.15.0
type FirstWriteWinsResolver struct{}
func (*FirstWriteWinsResolver) Resolve ¶ added in v0.15.0
func (r *FirstWriteWinsResolver) Resolve(ctx context.Context, c Conflict) (ResolvedConflict, error)
type GroupConfig ¶ added in v0.15.0
type GroupConfig struct {
Name string `json:"name" yaml:"name"`
Description string `json:"description,omitempty" yaml:"description,omitempty"`
Enabled *bool `json:"enabled,omitempty" yaml:"enabled,omitempty"`
Fallback string `json:"fallback,omitempty" yaml:"fallback,omitempty"`
Rules []RuleConfigEntry `json:"rules" yaml:"rules"`
SubGroups []GroupConfig `json:"subgroups,omitempty" yaml:"subgroups,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty" yaml:"metadata,omitempty"`
}
GroupConfig represents a rule group configuration.
type Hooks ¶ added in v0.15.0
type Hooks struct {
OnRuleMatched func(conflict Conflict, rule Rule)
OnResolved func(conflict Conflict, result ResolvedConflict)
OnFallback func(conflict Conflict)
OnError func(conflict Conflict, err error)
}
Hooks provides optional callbacks for observability around resolution. All hooks are optional; nil functions are safe no-ops.
type InMemoryMementoCaretaker ¶ added in v0.15.0
type InMemoryMementoCaretaker struct {
// contains filtered or unexported fields
}
InMemoryMementoCaretaker provides an in-memory implementation of MementoCaretaker. For production use, implement a persistent storage backend.
func NewInMemoryMementoCaretaker ¶ added in v0.15.0
func NewInMemoryMementoCaretaker() *InMemoryMementoCaretaker
NewInMemoryMementoCaretaker creates a new in-memory memento caretaker.
func (*InMemoryMementoCaretaker) Delete ¶ added in v0.15.0
func (c *InMemoryMementoCaretaker) Delete(ctx context.Context, id string) error
func (*InMemoryMementoCaretaker) Get ¶ added in v0.15.0
func (c *InMemoryMementoCaretaker) Get(ctx context.Context, id string) (*ResolutionMemento, error)
func (*InMemoryMementoCaretaker) GetAuditTrail ¶ added in v0.15.0
func (c *InMemoryMementoCaretaker) GetAuditTrail(ctx context.Context, aggregateID string) ([]*ResolutionMemento, error)
func (*InMemoryMementoCaretaker) List ¶ added in v0.15.0
func (c *InMemoryMementoCaretaker) List(ctx context.Context, criteria *MementoCriteria) ([]*ResolutionMemento, error)
func (*InMemoryMementoCaretaker) Save ¶ added in v0.15.0
func (c *InMemoryMementoCaretaker) Save(ctx context.Context, memento *ResolutionMemento) error
type LastWriteWinsResolver ¶ added in v0.15.0
type LastWriteWinsResolver struct{}
func (*LastWriteWinsResolver) Resolve ¶ added in v0.15.0
func (r *LastWriteWinsResolver) Resolve(ctx context.Context, c Conflict) (ResolvedConflict, error)
type LimitSettings ¶ added in v0.15.0
type LimitSettings struct {
MaxConflictsPerBatch int `json:"max_conflicts_per_batch,omitempty" yaml:"max_conflicts_per_batch,omitempty"`
MaxRulesPerGroup int `json:"max_rules_per_group,omitempty" yaml:"max_rules_per_group,omitempty"`
}
LimitSettings contains various limits.
type Logger ¶ added in v0.15.0
type Logger interface {
Debug(msg string, args ...interface{})
Error(msg string, args ...interface{})
}
Logger interface for composite resolver logging.
type LoggingWatcher ¶ added in v0.15.0
type LoggingWatcher struct {
// contains filtered or unexported fields
}
LoggingWatcher logs configuration changes.
func NewLoggingWatcher ¶ added in v0.15.0
func NewLoggingWatcher(logger Logger) *LoggingWatcher
func (*LoggingWatcher) Name ¶ added in v0.15.0
func (w *LoggingWatcher) Name() string
func (*LoggingWatcher) OnConfigChanged ¶ added in v0.15.0
func (w *LoggingWatcher) OnConfigChanged(oldConfig, newConfig *RuleConfig)
func (*LoggingWatcher) OnConfigError ¶ added in v0.15.0
func (w *LoggingWatcher) OnConfigError(err error)
type ManagerOption ¶ added in v0.15.0
type ManagerOption func(*SyncManagerBuilder) error
ManagerOption is a functional option for configuring a SyncManager via NewManager.
func WithAdditiveMerge ¶ added in v0.15.0
func WithAdditiveMerge() ManagerOption
WithAdditiveMerge is convenience for the Additive Merge strategy.
func WithBatchSize ¶ added in v0.15.0
func WithBatchSize(n int) ManagerOption
WithBatchSize sets the batch size in SyncOptions.
func WithCompression ¶ added in v0.15.0
func WithCompression(enabled bool) ManagerOption
WithCompression enables data compression during transport.
func WithConflictResolver ¶ added in v0.15.0
func WithConflictResolver(r ConflictResolver) ManagerOption
WithConflictResolver sets the conflict resolution strategy.
func WithFWW ¶ added in v0.15.0
func WithFWW() ManagerOption
WithFWW is convenience for the First-Write-Wins strategy.
func WithFilter ¶ added in v0.15.0
func WithFilter(filter func(Event) bool) ManagerOption
WithFilter sets an event filter function.
func WithHTTPTransport ¶ added in v0.15.0
func WithHTTPTransport(baseURL string) ManagerOption
WithHTTPTransport configures the HTTP transport. Note: This function is not available to avoid import cycles. Create HTTP transport separately and use WithTransport() instead.
Example:
transport := httptransport.NewTransport("http://localhost:8080/sync", nil, nil, nil)
mgr, err := synckit.NewManager(synckit.WithTransport(transport), ...)
func WithLWW ¶ added in v0.15.0
func WithLWW() ManagerOption
WithLWW is convenience for the common Last-Write-Wins strategy.
func WithManagerLogger ¶ added in v0.15.0
func WithManagerLogger(logger *slog.Logger) ManagerOption
WithManagerLogger sets a custom logger for the SyncManager.
func WithNullTransport ¶ added in v0.15.0
func WithNullTransport() ManagerOption
WithNullTransport configures a no-op transport for local-only scenarios. Use this when you only need local event storage without remote synchronization.
func WithPullOnly ¶ added in v0.15.0
func WithPullOnly() ManagerOption
WithPullOnly configures the SyncManager to only pull events.
func WithPushOnly ¶ added in v0.15.0
func WithPushOnly() ManagerOption
WithPushOnly configures the SyncManager to only push events.
func WithSQLite ¶ added in v0.15.0
func WithSQLite(path string) ManagerOption
WithSQLite constructs and injects a SQLite store. Note: This function is provided as an example. In practice, you should create your SQLite store separately and use WithStore() to avoid import cycles.
Example:
store, err := sqlite.NewWithDataSource("app.db")
if err != nil { /* handle */ }
mgr, err := synckit.NewManager(synckit.WithStore(store), ...)
func WithStore ¶ added in v0.15.0
func WithStore(s EventStore) ManagerOption
WithStore injects a pre-built EventStore.
func WithSyncInterval ¶ added in v0.15.0
func WithSyncInterval(interval time.Duration) ManagerOption
WithSyncInterval sets the interval for automatic synchronization.
func WithTimeout ¶ added in v0.15.0
func WithTimeout(d time.Duration) ManagerOption
WithTimeout sets a per-sync timeout in SyncOptions.
func WithTransport ¶ added in v0.15.0
func WithTransport(t Transport) ManagerOption
WithTransport sets a pre-built Transport.
func WithValidation ¶ added in v0.15.0
func WithValidation() ManagerOption
WithValidation enables additional validation checks during sync operations.
type ManualReviewResolver ¶ added in v0.15.0
type ManualReviewResolver struct{ Reason string }
func (*ManualReviewResolver) Resolve ¶ added in v0.15.0
func (r *ManualReviewResolver) Resolve(ctx context.Context, c Conflict) (ResolvedConflict, error)
type MatchConditions ¶ added in v0.15.0
type MatchConditions struct {
EventTypes []string `json:"event_types,omitempty" yaml:"event_types,omitempty"`
AggregateIDs []string `json:"aggregate_ids,omitempty" yaml:"aggregate_ids,omitempty"`
Fields []string `json:"fields,omitempty" yaml:"fields,omitempty"`
CustomMatch map[string]string `json:"custom_match,omitempty" yaml:"custom_match,omitempty"`
}
MatchConditions defines when a rule should be applied.
type MementoCaretaker ¶ added in v0.15.0
type MementoCaretaker interface {
// Save stores a resolution memento
Save(ctx context.Context, memento *ResolutionMemento) error
// Get retrieves a specific memento by ID
Get(ctx context.Context, id string) (*ResolutionMemento, error)
// List retrieves mementos based on criteria
List(ctx context.Context, criteria *MementoCriteria) ([]*ResolutionMemento, error)
// Delete removes a memento (with appropriate authorization)
Delete(ctx context.Context, id string) error
// GetAuditTrail returns the complete audit trail for an aggregate
GetAuditTrail(ctx context.Context, aggregateID string) ([]*ResolutionMemento, error)
}
MementoCaretaker manages the storage and retrieval of resolution mementos. It acts as the caretaker in the Memento pattern.
type MementoCriteria ¶ added in v0.15.0
type MementoCriteria struct {
AggregateID string `json:"aggregate_id,omitempty"`
UserID string `json:"user_id,omitempty"`
ResolverName string `json:"resolver_name,omitempty"`
GroupName string `json:"group_name,omitempty"`
FromTime *time.Time `json:"from_time,omitempty"`
ToTime *time.Time `json:"to_time,omitempty"`
Limit int `json:"limit,omitempty"`
Offset int `json:"offset,omitempty"`
}
MementoCriteria defines search criteria for querying mementos.
type MetricsCollector ¶
type MetricsCollector interface {
// RecordSyncDuration records how long a sync operation took
RecordSyncDuration(operation string, duration time.Duration)
// RecordSyncEvents records the number of events pushed and pulled
RecordSyncEvents(pushed, pulled int)
// RecordSyncErrors records sync operation errors by type
RecordSyncErrors(operation string, errorType string)
// RecordConflicts records the number of conflicts resolved
RecordConflicts(resolved int)
}
MetricsCollector provides hooks for collecting sync operation metrics
type MockMetricsCollector ¶
type MockMetricsCollector struct {
DurationCalls []struct {
Operation string
Duration time.Duration
}
EventCalls []struct {
Pushed, Pulled int
}
ErrorCalls []struct {
Operation, ErrorType string
}
ConflictCalls []struct {
Resolved int
}
}
MockMetricsCollector implements MetricsCollector interface for testing
func (*MockMetricsCollector) RecordConflicts ¶
func (m *MockMetricsCollector) RecordConflicts(resolved int)
func (*MockMetricsCollector) RecordSyncDuration ¶
func (m *MockMetricsCollector) RecordSyncDuration(operation string, duration time.Duration)
func (*MockMetricsCollector) RecordSyncErrors ¶
func (m *MockMetricsCollector) RecordSyncErrors(operation string, errorType string)
func (*MockMetricsCollector) RecordSyncEvents ¶
func (m *MockMetricsCollector) RecordSyncEvents(pushed, pulled int)
type NoOpMetricsCollector ¶
type NoOpMetricsCollector struct{}
NoOpMetricsCollector is a default implementation that does nothing
func (*NoOpMetricsCollector) RecordConflicts ¶
func (n *NoOpMetricsCollector) RecordConflicts(resolved int)
func (*NoOpMetricsCollector) RecordSyncDuration ¶
func (n *NoOpMetricsCollector) RecordSyncDuration(operation string, duration time.Duration)
func (*NoOpMetricsCollector) RecordSyncErrors ¶
func (n *NoOpMetricsCollector) RecordSyncErrors(operation string, errorType string)
func (*NoOpMetricsCollector) RecordSyncEvents ¶
func (n *NoOpMetricsCollector) RecordSyncEvents(pushed, pulled int)
type Notification ¶
type Notification struct {
// Type of notification (data_changed, sync_requested, etc.)
Type string
// Source identifies where the change originated
Source string
// AggregateIDs that were affected (optional, for filtering)
AggregateIDs []string
// Metadata for additional context
Metadata map[string]interface{}
// Timestamp when the notification was created
Timestamp time.Time
}
Notification represents a real-time update notification
type NotificationFilter ¶
type NotificationFilter func(notification Notification) bool
NotificationFilter allows filtering which notifications to process
type NotificationHandler ¶
type NotificationHandler func(notification Notification) error
NotificationHandler processes incoming real-time notifications
type ObservableOption ¶ added in v0.15.0
type ObservableOption interface {
// contains filtered or unexported methods
}
ObservableOption provides configuration for ObservableResolver.
func WithGroupName ¶ added in v0.15.0
func WithGroupName(name string) ObservableOption
WithGroupName sets the group name for observability.
func WithMetricsCollector ¶ added in v0.15.0
func WithMetricsCollector(mc MetricsCollector) ObservableOption
WithMetricsCollector sets the metrics collector for the resolver.
func WithObservableLogger ¶ added in v0.15.0
func WithObservableLogger(logger *slog.Logger) ObservableOption
WithObservableLogger sets the logger for the resolver.
func WithResolutionHooks ¶ added in v0.15.0
func WithResolutionHooks(hooks *ResolutionHooks) ObservableOption
WithResolutionHooks sets the resolution hooks for the resolver.
type ObservableResolver ¶ added in v0.15.0
type ObservableResolver struct {
// contains filtered or unexported fields
}
ObservableResolver wraps any ConflictResolver to provide detailed metrics and logging. It implements the Observer pattern for monitoring conflict resolution performance.
func NewObservableResolver ¶ added in v0.15.0
func NewObservableResolver(resolver ConflictResolver, opts ...ObservableOption) *ObservableResolver
NewObservableResolver creates a new ObservableResolver that wraps an existing resolver.
func (*ObservableResolver) Resolve ¶ added in v0.15.0
func (or *ObservableResolver) Resolve(ctx context.Context, conflict Conflict) (ResolvedConflict, error)
Resolve implements the ConflictResolver interface with added observability.
type Option ¶ added in v0.15.0
type Option interface {
// contains filtered or unexported methods
}
Option implements the Uber-style functional options pattern for construction.
func WithEventTypeRule ¶ added in v0.15.0
func WithEventTypeRule(name string, eventType string, resolver ConflictResolver) Option
WithEventTypeRule is a convenience helper for matching by event type.
func WithFallback ¶ added in v0.15.0
func WithFallback(r ConflictResolver) Option
WithFallback sets the required fallback ConflictResolver.
func WithLogger ¶ added in v0.15.0
WithLogger attaches an optional logger (opaque type to avoid dependency).
func WithRule ¶ added in v0.15.0
func WithRule(name string, matcher Spec, resolver ConflictResolver) Option
WithRule appends a rule with a custom matcher and resolver in insertion order.
func WithValidator ¶ added in v0.15.0
WithValidator sets an optional validator for construction-time checks.
type RealtimeNotifier ¶
type RealtimeNotifier interface {
// Subscribe starts listening for real-time notifications
// The handler will be called when new data is available for sync
Subscribe(ctx context.Context, handler NotificationHandler) error
// Unsubscribe stops listening for notifications
Unsubscribe() error
// Notify sends a notification to connected clients (server-side)
Notify(ctx context.Context, notification Notification) error
// IsConnected returns true if the real-time connection is active
IsConnected() bool
// Close closes the notifier connection
Close() error
}
RealtimeNotifier provides real-time push notifications when data changes. This is separate from Transport to allow for different notification mechanisms (WebSockets, SSE, NATS, etc.) while keeping the sync logic agnostic.
type RealtimeSyncManager ¶
type RealtimeSyncManager interface {
SyncManager
// EnableRealtime starts real-time notifications
EnableRealtime(ctx context.Context) error
// DisableRealtime stops real-time notifications and falls back to polling
DisableRealtime() error
// IsRealtimeActive returns true if real-time notifications are active
IsRealtimeActive() bool
// GetConnectionStatus returns the current real-time connection status
GetConnectionStatus() ConnectionStatus
}
RealtimeSyncManager extends SyncManager with real-time capabilities
func NewRealtimeSyncManager ¶
func NewRealtimeSyncManager(store EventStore, transport Transport, options *RealtimeSyncOptions) RealtimeSyncManager
NewRealtimeSyncManager creates a sync manager with real-time capabilities
type RealtimeSyncOptions ¶
type RealtimeSyncOptions struct {
SyncOptions
// RealtimeNotifier for push notifications (optional)
RealtimeNotifier RealtimeNotifier
// NotificationFilter to process only relevant notifications
NotificationFilter NotificationFilter
// DisablePolling stops periodic sync when real-time is active
DisablePolling bool
// FallbackInterval for polling when real-time connection is lost
FallbackInterval time.Duration
// ReconnectBackoff strategy for reconnecting to real-time notifications
ReconnectBackoff BackoffStrategy
}
RealtimeSyncOptions extends SyncOptions with real-time capabilities
type ResolutionConfig ¶ added in v0.15.0
type ResolutionConfig struct {
Strategy string `json:"strategy" yaml:"strategy"`
Options map[string]interface{} `json:"options,omitempty" yaml:"options,omitempty"`
}
ResolutionConfig defines how conflicts should be resolved.
type ResolutionHooks ¶ added in v0.15.0
type ResolutionHooks struct {
OnConflict func(ctx context.Context, conflict *Conflict)
OnRuleMatched func(ctx context.Context, conflict *Conflict, rule *Rule, group *RuleGroup)
OnResolution func(ctx context.Context, result *ResolvedConflict, duration time.Duration)
OnFallback func(ctx context.Context, conflict *Conflict, group *RuleGroup)
OnError func(ctx context.Context, conflict *Conflict, err error)
}
ResolutionHooks provides callbacks for observing conflict resolution events.
type ResolutionMemento ¶ added in v0.15.0
type ResolutionMemento struct {
// Unique identifier for this resolution
ID string `json:"id"`
// Timestamp when the resolution occurred
Timestamp time.Time `json:"timestamp"`
// Conflict information (serialization-friendly)
EventType string `json:"event_type"`
AggregateID string `json:"aggregate_id"`
ChangedFields []string `json:"changed_fields,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
// Resolution result (serialization-friendly)
Decision string `json:"decision"`
Reasons []string `json:"reasons,omitempty"`
// Resolution metadata
ResolverName string `json:"resolver_name"`
RuleName string `json:"rule_name,omitempty"`
GroupName string `json:"group_name,omitempty"`
Context map[string]any `json:"context,omitempty"`
// Audit trail information
UserID string `json:"user_id,omitempty"`
SessionID string `json:"session_id,omitempty"`
Origin string `json:"origin,omitempty"`
// Performance metrics
ResolutionDuration time.Duration `json:"resolution_duration"`
// State before resolution (for rollback)
BeforeState *ConflictState `json:"before_state,omitempty"`
// State after resolution
AfterState *ConflictState `json:"after_state,omitempty"`
// For internal use - not serialized
OriginalConflict Conflict `json:"-"`
ResolvedConflict ResolvedConflict `json:"-"`
}
ResolutionMemento captures the complete state and history of a conflict resolution. It implements the Memento pattern to provide audit trails and rollback capabilities. This struct is designed to be JSON serializable by avoiding Event interface fields.
type ResolvedConflict ¶ added in v0.15.0
type ResolvedConflict struct {
ResolvedEvents []EventWithVersion
Decision string
Reasons []string
}
ResolvedConflict captures the decision and any follow-up data.
type RetryConfig ¶
type RetryConfig struct {
// MaxAttempts is the maximum number of retry attempts
MaxAttempts int
// InitialDelay is the initial delay between retries
InitialDelay time.Duration
// MaxDelay is the maximum delay between retries
MaxDelay time.Duration
// Multiplier is the factor by which the delay increases
Multiplier float64
}
RetryConfig configures the retry behavior for sync operations
type RollbackAnalysis ¶ added in v0.15.0
type RollbackAnalysis struct {
TargetMemento *ResolutionMemento `json:"target_memento"`
AffectedResolutions []*ResolutionMemento `json:"affected_resolutions"`
RollbackComplexity string `json:"rollback_complexity"`
RequiresReprocessing bool `json:"requires_reprocessing"`
Warnings []string `json:"warnings,omitempty"`
}
RollbackAnalysis provides information about the implications of rolling back a resolution.
type RollbackCapability ¶ added in v0.15.0
type RollbackCapability struct {
// contains filtered or unexported fields
}
RollbackCapability provides methods for analyzing rollback possibilities. Note: Actual rollback implementation would require coordination with the storage layer.
func NewRollbackCapability ¶ added in v0.15.0
func NewRollbackCapability(caretaker MementoCaretaker) *RollbackCapability
NewRollbackCapability creates a new rollback capability analyzer.
func (*RollbackCapability) AnalyzeRollback ¶ added in v0.15.0
func (rc *RollbackCapability) AnalyzeRollback(ctx context.Context, mementoID string) (*RollbackAnalysis, error)
AnalyzeRollback analyzes what would be involved in rolling back a specific resolution.
type Rule ¶ added in v0.15.0
type Rule struct {
Name string
Matcher Spec
Resolver ConflictResolver
}
Rule binds a matcher Specification to a ConflictResolver Strategy. Rules are evaluated in insertion order with first-match-wins semantics.
type RuleConfig ¶ added in v0.15.0
type RuleConfig struct {
Version string `json:"version" yaml:"version"`
Name string `json:"name" yaml:"name"`
Description string `json:"description,omitempty" yaml:"description,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty" yaml:"metadata,omitempty"`
// Global settings
Settings ConfigSettings `json:"settings,omitempty" yaml:"settings,omitempty"`
// Rule groups
Groups []GroupConfig `json:"groups" yaml:"groups"`
// Global rules (not in any group)
Rules []RuleConfigEntry `json:"rules,omitempty" yaml:"rules,omitempty"`
}
RuleConfig represents the complete configuration structure for conflict resolution rules.
type RuleConfigEntry ¶ added in v0.15.0
type RuleConfigEntry struct {
Name string `json:"name" yaml:"name"`
Description string `json:"description,omitempty" yaml:"description,omitempty"`
Enabled *bool `json:"enabled,omitempty" yaml:"enabled,omitempty"`
Priority int `json:"priority,omitempty" yaml:"priority,omitempty"`
// Matching conditions
Conditions MatchConditions `json:"conditions" yaml:"conditions"`
// Resolution configuration
Resolution ResolutionConfig `json:"resolution" yaml:"resolution"`
// Metadata for custom extensions
Metadata map[string]interface{} `json:"metadata,omitempty" yaml:"metadata,omitempty"`
}
RuleConfigEntry represents a single rule configuration.
type RuleGroup ¶ added in v0.15.0
RuleGroup implements the Composite pattern for organizing rules hierarchically. It allows grouping related rules under a common namespace and provides domain modularity without polluting the top-level rule order.
func NewRuleGroup ¶ added in v0.15.0
func NewRuleGroup(name string, opts ...RuleGroupOption) *RuleGroup
NewRuleGroup creates a new rule group with the given name.
func (*RuleGroup) AddSubGroup ¶ added in v0.15.0
AddSubGroup adds a sub-group to this group, allowing nested hierarchies.
func (*RuleGroup) GetAllRules ¶ added in v0.15.0
GetAllRules returns all rules from this group and its subgroups in order. This flattens the hierarchy for execution by DynamicResolver.
func (*RuleGroup) GetStats ¶ added in v0.15.0
func (rg *RuleGroup) GetStats() RuleGroupStats
GetStats returns statistics about the rule group structure.
func (*RuleGroup) ResolveInGroup ¶ added in v0.15.0
func (rg *RuleGroup) ResolveInGroup(ctx context.Context, conflict Conflict) (ResolvedConflict, bool, error)
ResolveInGroup attempts to resolve a conflict using only rules within this group. This allows for group-specific resolution before falling back to global rules.
type RuleGroupOption ¶ added in v0.15.0
type RuleGroupOption interface {
// contains filtered or unexported methods
}
RuleGroupOption provides configuration options for rule groups.
func WithDescription ¶ added in v0.15.0
func WithDescription(desc string) RuleGroupOption
WithDescription sets a description for the rule group.
func WithEnabled ¶ added in v0.15.0
func WithEnabled(enabled bool) RuleGroupOption
WithEnabled sets the enabled state of the rule group.
func WithGroupFallback ¶ added in v0.15.0
func WithGroupFallback(resolver ConflictResolver) RuleGroupOption
WithGroupFallback sets a fallback resolver specific to this group.
type RuleGroupStats ¶ added in v0.15.0
type RuleGroupStats struct {
Name string `json:"name"`
Enabled bool `json:"enabled"`
RuleCount int `json:"rule_count"` // Direct rules in this group
SubGroupCount int `json:"subgroup_count"` // Number of direct subgroups
TotalRules int `json:"total_rules"` // Total rules including subgroups
SubGroups []RuleGroupStats `json:"subgroups,omitempty"`
}
RuleGroupStats provides statistical information about rule group structure.
type Spec ¶ added in v0.15.0
Spec is a predicate used to match conflicts to rules. Combinators allow building complex match logic from small, testable pieces.
func AggregateIDMatches ¶ added in v0.15.0
AggregateIDMatches matches when the aggregate ID matches the pattern.
func AlwaysMatch ¶ added in v0.15.0
func AlwaysMatch() Spec
AlwaysMatch returns a spec that always matches.
func AndMatcher ¶ added in v0.15.0
AndMatcher combines multiple specs with AND logic.
func AnyFieldIn ¶ added in v0.15.0
AnyFieldIn matches when any of the conflict's changed fields is in the set.
func EventTypeIs ¶ added in v0.15.0
EventTypeIs matches a specific event type.
func FieldChanged ¶ added in v0.15.0
FieldChanged matches when a specific field has changed.
func MetadataEq ¶ added in v0.15.0
MetadataEq matches when Metadata[key] equals value.
type SyncManager ¶
type SyncManager interface {
// Sync performs a bidirectional sync operation
Sync(ctx context.Context) (*SyncResult, error)
// Push sends local events to remote
Push(ctx context.Context) (*SyncResult, error)
// Pull retrieves remote events to local
Pull(ctx context.Context) (*SyncResult, error)
// StartAutoSync begins automatic synchronization at the configured interval
StartAutoSync(ctx context.Context) error
// StopAutoSync stops automatic synchronization
StopAutoSync() error
// Subscribe to sync events (optional)
Subscribe(handler func(*SyncResult)) error
// Close shuts down the sync manager
Close() error
}
SyncManager coordinates the synchronization process between local and remote stores. This is the main entry point for the sync package.
func NewManager ¶ added in v0.15.0
func NewManager(opts ...ManagerOption) (SyncManager, error)
NewManager constructs a SyncManager using functional options on top of the existing builder. It keeps your builder for advanced use while offering a concise, discoverable API.
func NewSyncManager ¶
func NewSyncManager(store EventStore, transport Transport, opts *SyncOptions, logger *slog.Logger) SyncManager
NewSyncManager creates a new sync manager with the provided components
type SyncManagerBuilder ¶
type SyncManagerBuilder struct {
// contains filtered or unexported fields
}
SyncManagerBuilder provides a fluent interface for constructing SyncManager instances.
func NewSyncManagerBuilder ¶
func NewSyncManagerBuilder() *SyncManagerBuilder
NewSyncManagerBuilder creates a new builder with default options.
func (*SyncManagerBuilder) Build ¶
func (b *SyncManagerBuilder) Build() (SyncManager, error)
Build creates a new SyncManager instance with the configured options.
func (*SyncManagerBuilder) Reset ¶
func (b *SyncManagerBuilder) Reset() *SyncManagerBuilder
Reset clears the builder, allowing reuse.
func (*SyncManagerBuilder) WithBatchSize ¶
func (b *SyncManagerBuilder) WithBatchSize(size int) *SyncManagerBuilder
WithBatchSize sets the batch size for sync operations.
func (*SyncManagerBuilder) WithCompression ¶
func (b *SyncManagerBuilder) WithCompression(enabled bool) *SyncManagerBuilder
WithCompression enables data compression during transport.
func (*SyncManagerBuilder) WithConflictResolver ¶
func (b *SyncManagerBuilder) WithConflictResolver(resolver ConflictResolver) *SyncManagerBuilder
WithConflictResolver sets the conflict resolution strategy.
func (*SyncManagerBuilder) WithFilter ¶
func (b *SyncManagerBuilder) WithFilter(filter func(Event) bool) *SyncManagerBuilder
WithFilter sets an event filter function.
func (*SyncManagerBuilder) WithLogger ¶ added in v0.12.0
func (b *SyncManagerBuilder) WithLogger(logger *slog.Logger) *SyncManagerBuilder
WithLogger sets a custom logger for the SyncManager.
func (*SyncManagerBuilder) WithPullOnly ¶
func (b *SyncManagerBuilder) WithPullOnly() *SyncManagerBuilder
WithPullOnly configures the SyncManager to only pull events.
func (*SyncManagerBuilder) WithPushOnly ¶
func (b *SyncManagerBuilder) WithPushOnly() *SyncManagerBuilder
WithPushOnly configures the SyncManager to only push events.
func (*SyncManagerBuilder) WithStore ¶
func (b *SyncManagerBuilder) WithStore(store EventStore) *SyncManagerBuilder
WithStore sets the EventStore for the SyncManager.
func (*SyncManagerBuilder) WithSyncInterval ¶
func (b *SyncManagerBuilder) WithSyncInterval(interval time.Duration) *SyncManagerBuilder
WithSyncInterval sets the interval for automatic synchronization.
func (*SyncManagerBuilder) WithTimeout ¶
func (b *SyncManagerBuilder) WithTimeout(timeout time.Duration) *SyncManagerBuilder
WithTimeout sets the maximum duration for sync operations.
func (*SyncManagerBuilder) WithTransport ¶
func (b *SyncManagerBuilder) WithTransport(transport Transport) *SyncManagerBuilder
WithTransport sets the Transport for the SyncManager.
func (*SyncManagerBuilder) WithValidation ¶
func (b *SyncManagerBuilder) WithValidation() *SyncManagerBuilder
WithValidation enables additional validation checks during sync operations.
type SyncOptions ¶
type SyncOptions struct {
// LastCursorLoader loads the last saved cursor for cursor-based syncs
LastCursorLoader func() cursor.Cursor
// CursorSaver saves the latest cursor after a successful sync
CursorSaver func(cursor.Cursor) error
// PushOnly indicates this client should only push events, not pull
PushOnly bool
// PullOnly indicates this client should only pull events, not push
PullOnly bool
// ConflictResolver to use for handling conflicts
ConflictResolver ConflictResolver
// Filter can be used to sync only specific events
Filter func(Event) bool
// BatchSize limits how many events to sync at once
BatchSize int
// SyncInterval for automatic periodic sync (0 disables)
SyncInterval time.Duration
// RetryConfig configures retry behavior for sync operations
RetryConfig *RetryConfig
// EnableValidation enables additional validation checks during sync
EnableValidation bool
// Timeout sets the maximum duration for sync operations
Timeout time.Duration
// EnableCompression enables data compression during transport
EnableCompression bool
// MetricsCollector for observability hooks (optional)
MetricsCollector MetricsCollector
}
SyncOptions configures the synchronization behavior
type SyncResult ¶
type SyncResult struct {
// EventsPushed is the number of events sent to remote
EventsPushed int
// EventsPulled is the number of events received from remote
EventsPulled int
// ConflictsResolved is the number of conflicts that were resolved
ConflictsResolved int
// Errors contains any non-fatal errors that occurred during sync
Errors []error
// StartTime is when the sync operation began
StartTime time.Time
// Duration is how long the sync took
Duration time.Duration
// LocalVersion is the local version after sync
LocalVersion Version
// RemoteVersion is the remote version after sync
RemoteVersion Version
}
SyncResult provides information about a completed sync operation
type TestEvent ¶
type TestEvent struct{}
TestEvent implements Event interface for testing
func (*TestEvent) AggregateID ¶
type TestEventStore ¶
type TestEventStore struct {
// contains filtered or unexported fields
}
TestEventStore implements a simple in-memory event store for testing
func (*TestEventStore) Close ¶
func (m *TestEventStore) Close() error
func (*TestEventStore) LatestVersion ¶
func (m *TestEventStore) LatestVersion(_ context.Context) (Version, error)
func (*TestEventStore) Load ¶
func (m *TestEventStore) Load(_ context.Context, _ Version) ([]EventWithVersion, error)
func (*TestEventStore) LoadByAggregate ¶
func (m *TestEventStore) LoadByAggregate(_ context.Context, _ string, _ Version) ([]EventWithVersion, error)
func (*TestEventStore) ParseVersion ¶
type TestTransport ¶
type TestTransport struct {
// contains filtered or unexported fields
}
TestTransport implements a simple transport for testing
func (*TestTransport) Close ¶
func (m *TestTransport) Close() error
func (*TestTransport) GetLatestVersion ¶
func (m *TestTransport) GetLatestVersion(ctx context.Context) (Version, error)
func (*TestTransport) Pull ¶
func (m *TestTransport) Pull(ctx context.Context, since Version) ([]EventWithVersion, error)
func (*TestTransport) Push ¶
func (m *TestTransport) Push(ctx context.Context, events []EventWithVersion) error
func (*TestTransport) Subscribe ¶
func (m *TestTransport) Subscribe(_ context.Context, _ func([]EventWithVersion) error) error
type TestVersion ¶
type TestVersion struct{}
TestVersion implements Version interface for testing
func (*TestVersion) Compare ¶
func (m *TestVersion) Compare(_ Version) int
func (*TestVersion) IsZero ¶
func (m *TestVersion) IsZero() bool
func (*TestVersion) String ¶
func (m *TestVersion) String() string
type TimeoutSettings ¶ added in v0.15.0
type TimeoutSettings struct {
ResolutionTimeoutMs int `json:"resolution_timeout_ms,omitempty" yaml:"resolution_timeout_ms,omitempty"`
ManualReviewTimeoutMs int `json:"manual_review_timeout_ms,omitempty" yaml:"manual_review_timeout_ms,omitempty"`
}
TimeoutSettings contains timeout configurations.
type Transport ¶
type Transport interface {
// Push sends events to the remote endpoint
Push(ctx context.Context, events []EventWithVersion) error
// Pull retrieves events from the remote endpoint since the given version
Pull(ctx context.Context, since Version) ([]EventWithVersion, error)
// GetLatestVersion efficiently retrieves the latest version from remote without pulling events
GetLatestVersion(ctx context.Context) (Version, error)
// Subscribe listens for real-time updates (optional for polling-based transports)
Subscribe(ctx context.Context, handler func([]EventWithVersion) error) error
// Close closes the transport connection
Close() error
}
Transport handles the actual network communication between clients and servers. Implementations can use HTTP, gRPC, WebSockets, NATS, etc.