synckit

package
v0.17.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 21, 2025 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package synckit - aliases for backward compatibility and future expansion

Package sync provides a generic event-driven synchronization system for distributed applications. It supports offline-first architectures with conflict resolution and pluggable storage backends.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AdditiveMergeResolver added in v0.15.0

type AdditiveMergeResolver struct{}

func (*AdditiveMergeResolver) Resolve added in v0.15.0

type AuditableOption added in v0.15.0

type AuditableOption interface {
	// contains filtered or unexported methods
}

AuditableOption provides configuration options for AuditableResolver.

func WithAuditLogger added in v0.15.0

func WithAuditLogger(logger Logger) AuditableOption

WithAuditLogger sets a logger for the auditable resolver.

func WithOriginExtractor added in v0.15.0

func WithOriginExtractor(extractor func(context.Context) string) AuditableOption

WithOriginExtractor sets a function to extract origin information from context.

func WithSessionIDExtractor added in v0.15.0

func WithSessionIDExtractor(extractor func(context.Context) string) AuditableOption

WithSessionIDExtractor sets a function to extract session ID from context.

func WithUserIDExtractor added in v0.15.0

func WithUserIDExtractor(extractor func(context.Context) string) AuditableOption

WithUserIDExtractor sets a function to extract user ID from context.

type AuditableResolver added in v0.15.0

type AuditableResolver struct {
	// contains filtered or unexported fields
}

AuditableResolver wraps any ConflictResolver to provide audit trail capabilities.

func NewAuditableResolver added in v0.15.0

func NewAuditableResolver(resolver ConflictResolver, caretaker MementoCaretaker, opts ...AuditableOption) *AuditableResolver

NewAuditableResolver creates a new auditable resolver that wraps an existing resolver.

func (*AuditableResolver) GetAuditTrail added in v0.15.0

func (ar *AuditableResolver) GetAuditTrail(ctx context.Context, aggregateID string) ([]*ResolutionMemento, error)

GetAuditTrail retrieves the audit trail for a specific aggregate.

func (*AuditableResolver) Resolve added in v0.15.0

func (ar *AuditableResolver) Resolve(ctx context.Context, conflict Conflict) (ResolvedConflict, error)

Resolve implements ConflictResolver interface with audit trail creation.

type BackoffStrategy

type BackoffStrategy interface {
	// NextDelay returns the delay before the next reconnection attempt
	NextDelay(attempt int) time.Duration

	// Reset resets the backoff strategy after a successful connection
	Reset()
}

BackoffStrategy defines how to handle reconnection delays

type BasicValidator added in v0.15.0

type BasicValidator struct{}

BasicValidator provides basic configuration validation.

func (*BasicValidator) Name added in v0.15.0

func (v *BasicValidator) Name() string

func (*BasicValidator) Validate added in v0.15.0

func (v *BasicValidator) Validate(config *RuleConfig) error

type CompositeOption added in v0.15.0

type CompositeOption interface {
	// contains filtered or unexported methods
}

CompositeOption provides configuration options for CompositeResolver.

func WithCompositeLogger added in v0.15.0

func WithCompositeLogger(logger Logger) CompositeOption

WithCompositeLogger sets a logger for the composite resolver.

type CompositeResolver added in v0.15.0

type CompositeResolver struct {
	// contains filtered or unexported fields
}

CompositeResolver implements ConflictResolver and acts as a root container for multiple rule groups with group-aware resolution.

func NewCompositeResolver added in v0.15.0

func NewCompositeResolver(fallback ConflictResolver, opts ...CompositeOption) *CompositeResolver

NewCompositeResolver creates a new composite resolver that manages multiple rule groups.

func (*CompositeResolver) AddGroup added in v0.15.0

func (cr *CompositeResolver) AddGroup(group *RuleGroup) *CompositeResolver

AddGroup adds a rule group to the composite resolver.

func (*CompositeResolver) GetAllGroups added in v0.15.0

func (cr *CompositeResolver) GetAllGroups() []*RuleGroup

GetAllGroups returns all rule groups managed by this composite resolver.

func (*CompositeResolver) GetGroupStats added in v0.15.0

func (cr *CompositeResolver) GetGroupStats() []RuleGroupStats

GetGroupStats returns statistics for all managed rule groups.

func (*CompositeResolver) Resolve added in v0.15.0

func (cr *CompositeResolver) Resolve(ctx context.Context, conflict Conflict) (ResolvedConflict, error)

Resolve implements ConflictResolver interface for CompositeResolver. It tries each group in order, then falls back to the global fallback.

type ConfigLoader added in v0.15.0

type ConfigLoader struct {
	// contains filtered or unexported fields
}

ConfigLoader provides dynamic loading and validation of conflict resolution rules from YAML or JSON configuration files with runtime update capabilities.

func NewConfigLoader added in v0.15.0

func NewConfigLoader(opts ...ConfigLoaderOption) *ConfigLoader

NewConfigLoader creates a new configuration loader.

func (*ConfigLoader) BuildDynamicResolver added in v0.15.0

func (cl *ConfigLoader) BuildDynamicResolver() (*DynamicResolver, error)

BuildDynamicResolver creates a DynamicResolver from the current configuration.

func (*ConfigLoader) GetCurrentConfig added in v0.15.0

func (cl *ConfigLoader) GetCurrentConfig() *RuleConfig

GetCurrentConfig returns the current configuration.

func (*ConfigLoader) LoadFromBytes added in v0.15.0

func (cl *ConfigLoader) LoadFromBytes(data []byte, format string) error

LoadFromBytes loads configuration from raw bytes.

func (*ConfigLoader) LoadFromFile added in v0.15.0

func (cl *ConfigLoader) LoadFromFile(filepath string) error

LoadFromFile loads configuration from a YAML or JSON file.

type ConfigLoaderOption added in v0.15.0

type ConfigLoaderOption interface {
	// contains filtered or unexported methods
}

ConfigLoaderOption provides configuration options for ConfigLoader.

func WithConfigLogger added in v0.15.0

func WithConfigLogger(logger Logger) ConfigLoaderOption

WithConfigLogger sets a logger for the config loader.

func WithConfigValidator added in v0.15.0

func WithConfigValidator(validator ConfigValidator) ConfigLoaderOption

WithConfigValidator adds a configuration validator.

func WithTransformer added in v0.15.0

func WithTransformer(transformer ConfigTransformer) ConfigLoaderOption

WithTransformer adds a configuration transformer.

func WithWatcher added in v0.15.0

func WithWatcher(watcher ConfigWatcher) ConfigLoaderOption

WithWatcher adds a configuration change watcher.

type ConfigSettings added in v0.15.0

type ConfigSettings struct {
	DefaultFallback string            `json:"default_fallback,omitempty" yaml:"default_fallback,omitempty"`
	Timeouts        TimeoutSettings   `json:"timeouts,omitempty" yaml:"timeouts,omitempty"`
	Limits          LimitSettings     `json:"limits,omitempty" yaml:"limits,omitempty"`
	Features        map[string]bool   `json:"features,omitempty" yaml:"features,omitempty"`
	Custom          map[string]string `json:"custom,omitempty" yaml:"custom,omitempty"`
}

ConfigSettings contains global configuration settings.

type ConfigTransformer added in v0.15.0

type ConfigTransformer interface {
	Transform(config *RuleConfig) (*RuleConfig, error)
	Name() string
}

ConfigTransformer allows modification of configuration during loading.

type ConfigValidator added in v0.15.0

type ConfigValidator interface {
	Validate(config *RuleConfig) error
	Name() string
}

ConfigValidator validates configuration before applying it.

type ConfigWatcher added in v0.15.0

type ConfigWatcher interface {
	OnConfigChanged(oldConfig, newConfig *RuleConfig)
	OnConfigError(err error)
	Name() string
}

ConfigWatcher monitors configuration changes.

type Conflict added in v0.15.0

type Conflict struct {
	EventType     string
	AggregateID   string
	ChangedFields []string
	Metadata      map[string]any

	Local  EventWithVersion
	Remote EventWithVersion
}

Conflict carries the context needed to resolve a detected conflict between local and remote changes. Domain-agnostic by design.

type ConflictResolver

type ConflictResolver interface {
	Resolve(ctx context.Context, c Conflict) (ResolvedConflict, error)
}

ConflictResolver is the Strategy interface for conflict resolution.

type ConflictState added in v0.15.0

type ConflictState struct {
	AggregateID   string         `json:"aggregate_id"`
	EventType     string         `json:"event_type,omitempty"`
	LocalVersion  string         `json:"local_version"`
	RemoteVersion string         `json:"remote_version"`
	LocalData     any            `json:"local_data,omitempty"`
	RemoteData    any            `json:"remote_data,omitempty"`
	ResolvedData  any            `json:"resolved_data,omitempty"`
	Metadata      map[string]any `json:"metadata,omitempty"`
}

ConflictState captures the state of entities involved in a conflict. This struct is designed to be JSON serializable by storing only the data payloads rather than the full Event interfaces.

type ConnectionStatus

type ConnectionStatus struct {
	Connected         bool
	LastConnected     time.Time
	ReconnectAttempts int
	Error             error
}

ConnectionStatus represents the state of the real-time connection

type CursorTransport added in v0.8.0

type CursorTransport interface {
	Transport

	// PullWithCursor retrieves events using a cursor-based pagination strategy.
	// Returns events, next cursor, and any error.
	PullWithCursor(ctx context.Context, since cursor.Cursor, limit int) ([]EventWithVersion, cursor.Cursor, error)
}

CursorTransport extends Transport with cursor-based capabilities.

type DynamicResolver added in v0.15.0

type DynamicResolver struct {
	// contains filtered or unexported fields
}

DynamicResolver dispatches conflicts to strategies based on an ordered rule set. If no rule matches, it uses the fallback resolver. If no fallback is configured, Resolve returns an error.

func NewDynamicResolver added in v0.15.0

func NewDynamicResolver(opts ...Option) (*DynamicResolver, error)

NewDynamicResolver constructs a DynamicResolver with validation. Invariants: - At least one rule OR a non-nil fallback must be provided - No rule may have a nil matcher or resolver - If provided, Validator must approve the configuration

func (*DynamicResolver) Resolve added in v0.15.0

Resolve implements the ConflictResolver interface using first-match-wins over the ordered rules, else delegates to fallback.

type Event

type Event = types.Event

Event represents a syncable event in the system. This interface should be implemented by user's event types.

type EventStore

type EventStore interface {
	// Store persists an event to the store
	Store(ctx context.Context, event Event, version Version) error

	// Load retrieves all events since the given version
	Load(ctx context.Context, since Version) ([]EventWithVersion, error)

	// LoadByAggregate retrieves events for a specific aggregate since the given version
	LoadByAggregate(ctx context.Context, aggregateID string, since Version) ([]EventWithVersion, error)

	// LatestVersion returns the latest version in the store
	LatestVersion(ctx context.Context) (Version, error)

	// ParseVersion converts a string representation into a Version
	// This allows different storage implementations to handle their own version formats
	ParseVersion(ctx context.Context, versionStr string) (Version, error)

	// Close closes the store and releases resources
	Close() error
}

EventStore provides persistence for events. Implementations can use any storage backend (SQLite, BadgerDB, PostgreSQL, etc.).

type EventWithVersion

type EventWithVersion = types.EventWithVersion

EventWithVersion pairs an event with its version information

type ExponentialBackoff

type ExponentialBackoff struct {
	InitialDelay time.Duration
	MaxDelay     time.Duration
	Multiplier   float64
}

ExponentialBackoff implements exponential backoff with jitter

func (*ExponentialBackoff) NextDelay

func (eb *ExponentialBackoff) NextDelay(attempt int) time.Duration

func (*ExponentialBackoff) Reset

func (eb *ExponentialBackoff) Reset()

type ExtendedMetricsCollector added in v0.15.0

type ExtendedMetricsCollector interface {
	MetricsCollector // Embed existing interface

	// New methods for detailed conflict resolution metrics
	RecordResolution(ruleName, groupName, decision string, duration time.Duration, success bool)
	RecordRuleMatch(ruleName, groupName string)
	RecordFallbackUsage(groupName string)
	RecordResolutionError(ruleName, groupName, errorType string)
	RecordManualReview(reason string)
}

ExtendedMetricsCollector extends the basic MetricsCollector with detailed resolution metrics.

type ExtendedNoOpMetricsCollector added in v0.15.0

type ExtendedNoOpMetricsCollector struct {
	NoOpMetricsCollector // Embed existing no-op collector
}

ExtendedNoOpMetricsCollector extends NoOpMetricsCollector with extended methods.

func (*ExtendedNoOpMetricsCollector) RecordFallbackUsage added in v0.15.0

func (c *ExtendedNoOpMetricsCollector) RecordFallbackUsage(group string)

func (*ExtendedNoOpMetricsCollector) RecordManualReview added in v0.15.0

func (c *ExtendedNoOpMetricsCollector) RecordManualReview(reason string)

func (*ExtendedNoOpMetricsCollector) RecordResolution added in v0.15.0

func (c *ExtendedNoOpMetricsCollector) RecordResolution(rule, group, decision string, dur time.Duration, success bool)

func (*ExtendedNoOpMetricsCollector) RecordResolutionError added in v0.15.0

func (c *ExtendedNoOpMetricsCollector) RecordResolutionError(rule, group, errType string)

func (*ExtendedNoOpMetricsCollector) RecordRuleMatch added in v0.15.0

func (c *ExtendedNoOpMetricsCollector) RecordRuleMatch(rule, group string)

type FirstWriteWinsResolver added in v0.15.0

type FirstWriteWinsResolver struct{}

func (*FirstWriteWinsResolver) Resolve added in v0.15.0

type GroupConfig added in v0.15.0

type GroupConfig struct {
	Name        string                 `json:"name" yaml:"name"`
	Description string                 `json:"description,omitempty" yaml:"description,omitempty"`
	Enabled     *bool                  `json:"enabled,omitempty" yaml:"enabled,omitempty"`
	Fallback    string                 `json:"fallback,omitempty" yaml:"fallback,omitempty"`
	Rules       []RuleConfigEntry      `json:"rules" yaml:"rules"`
	SubGroups   []GroupConfig          `json:"subgroups,omitempty" yaml:"subgroups,omitempty"`
	Metadata    map[string]interface{} `json:"metadata,omitempty" yaml:"metadata,omitempty"`
}

GroupConfig represents a rule group configuration.

type Hooks added in v0.15.0

type Hooks struct {
	OnRuleMatched func(conflict Conflict, rule Rule)
	OnResolved    func(conflict Conflict, result ResolvedConflict)
	OnFallback    func(conflict Conflict)
	OnError       func(conflict Conflict, err error)
}

Hooks provides optional callbacks for observability around resolution. All hooks are optional; nil functions are safe no-ops.

type InMemoryMementoCaretaker added in v0.15.0

type InMemoryMementoCaretaker struct {
	// contains filtered or unexported fields
}

InMemoryMementoCaretaker provides an in-memory implementation of MementoCaretaker. For production use, implement a persistent storage backend.

func NewInMemoryMementoCaretaker added in v0.15.0

func NewInMemoryMementoCaretaker() *InMemoryMementoCaretaker

NewInMemoryMementoCaretaker creates a new in-memory memento caretaker.

func (*InMemoryMementoCaretaker) Delete added in v0.15.0

func (*InMemoryMementoCaretaker) Get added in v0.15.0

func (*InMemoryMementoCaretaker) GetAuditTrail added in v0.15.0

func (c *InMemoryMementoCaretaker) GetAuditTrail(ctx context.Context, aggregateID string) ([]*ResolutionMemento, error)

func (*InMemoryMementoCaretaker) List added in v0.15.0

func (*InMemoryMementoCaretaker) Save added in v0.15.0

type LastWriteWinsResolver added in v0.15.0

type LastWriteWinsResolver struct{}

func (*LastWriteWinsResolver) Resolve added in v0.15.0

type LimitSettings added in v0.15.0

type LimitSettings struct {
	MaxConflictsPerBatch int `json:"max_conflicts_per_batch,omitempty" yaml:"max_conflicts_per_batch,omitempty"`
	MaxRulesPerGroup     int `json:"max_rules_per_group,omitempty" yaml:"max_rules_per_group,omitempty"`
}

LimitSettings contains various limits.

type Logger added in v0.15.0

type Logger interface {
	Debug(msg string, args ...interface{})
	Error(msg string, args ...interface{})
}

Logger interface for composite resolver logging.

type LoggingWatcher added in v0.15.0

type LoggingWatcher struct {
	// contains filtered or unexported fields
}

LoggingWatcher logs configuration changes.

func NewLoggingWatcher added in v0.15.0

func NewLoggingWatcher(logger Logger) *LoggingWatcher

func (*LoggingWatcher) Name added in v0.15.0

func (w *LoggingWatcher) Name() string

func (*LoggingWatcher) OnConfigChanged added in v0.15.0

func (w *LoggingWatcher) OnConfigChanged(oldConfig, newConfig *RuleConfig)

func (*LoggingWatcher) OnConfigError added in v0.15.0

func (w *LoggingWatcher) OnConfigError(err error)

type ManagerOption added in v0.15.0

type ManagerOption func(*SyncManagerBuilder) error

ManagerOption is a functional option for configuring a SyncManager via NewManager.

func WithAdditiveMerge added in v0.15.0

func WithAdditiveMerge() ManagerOption

WithAdditiveMerge is convenience for the Additive Merge strategy.

func WithBatchSize added in v0.15.0

func WithBatchSize(n int) ManagerOption

WithBatchSize sets the batch size in SyncOptions.

func WithCompression added in v0.15.0

func WithCompression(enabled bool) ManagerOption

WithCompression enables data compression during transport.

func WithConflictResolver added in v0.15.0

func WithConflictResolver(r ConflictResolver) ManagerOption

WithConflictResolver sets the conflict resolution strategy.

func WithFWW added in v0.15.0

func WithFWW() ManagerOption

WithFWW is convenience for the First-Write-Wins strategy.

func WithFilter added in v0.15.0

func WithFilter(filter func(Event) bool) ManagerOption

WithFilter sets an event filter function.

func WithHTTPTransport added in v0.15.0

func WithHTTPTransport(baseURL string) ManagerOption

WithHTTPTransport configures the HTTP transport. Note: This function is not available to avoid import cycles. Create HTTP transport separately and use WithTransport() instead.

Example:

transport := httptransport.NewTransport("http://localhost:8080/sync", nil, nil, nil)
mgr, err := synckit.NewManager(synckit.WithTransport(transport), ...)

func WithHealthChecker added in v0.17.0

func WithHealthChecker(checker interface {
	// AddCheck adds a health check for a specific component and check type
	AddCheck(checkType string, check interface{})
	// CheckLiveness performs all liveness checks
	CheckLiveness(ctx context.Context) interface{}
	// CheckReadiness performs all readiness checks
	CheckReadiness(ctx context.Context) interface{}
	// CheckStartup performs all startup checks
	CheckStartup(ctx context.Context) interface{}
}) ManagerOption

WithHealthChecker sets a health checker for monitoring sync-kit component health. The health checker will be automatically configured with sync-kit specific health checks.

func WithLWW added in v0.15.0

func WithLWW() ManagerOption

WithLWW is convenience for the common Last-Write-Wins strategy.

func WithManagerLogger added in v0.15.0

func WithManagerLogger(logger *slog.Logger) ManagerOption

WithManagerLogger sets a custom logger for the SyncManager.

func WithMetrics added in v0.17.0

func WithMetrics(collector MetricsCollector) ManagerOption

WithMetrics sets a metrics collector for Prometheus metrics collection. The collector must implement the synckit.MetricsCollector interface.

func WithNullTransport added in v0.15.0

func WithNullTransport() ManagerOption

WithNullTransport configures a no-op transport for local-only scenarios. Use this when you only need local event storage without remote synchronization.

func WithPullOnly added in v0.15.0

func WithPullOnly() ManagerOption

WithPullOnly configures the SyncManager to only pull events.

func WithPushOnly added in v0.15.0

func WithPushOnly() ManagerOption

WithPushOnly configures the SyncManager to only push events.

func WithSQLite added in v0.15.0

func WithSQLite(path string) ManagerOption

WithSQLite constructs and injects a SQLite store. Note: This function is provided as an example. In practice, you should create your SQLite store separately and use WithStore() to avoid import cycles.

Example:

store, err := sqlite.NewWithDataSource("app.db")
if err != nil { /* handle */ }
mgr, err := synckit.NewManager(synckit.WithStore(store), ...)

func WithStore added in v0.15.0

func WithStore(s EventStore) ManagerOption

WithStore injects a pre-built EventStore.

func WithSyncInterval added in v0.15.0

func WithSyncInterval(interval time.Duration) ManagerOption

WithSyncInterval sets the interval for automatic synchronization.

func WithTimeout added in v0.15.0

func WithTimeout(d time.Duration) ManagerOption

WithTimeout sets a per-sync timeout in SyncOptions.

func WithTracing added in v0.17.0

func WithTracing(tracer interface {
	StartSyncOperation(ctx context.Context, operation string) (context.Context, trace.Span)
	StartTransportOperation(ctx context.Context, operation, transport string) (context.Context, trace.Span)
	StartStorageOperation(ctx context.Context, operation, storageType string) (context.Context, trace.Span)
	StartConflictResolution(ctx context.Context, strategy string) (context.Context, trace.Span)
	RecordError(span trace.Span, err error, description string)
	SetSyncResult(span trace.Span, eventsPushed, eventsPulled, conflictsResolved int)
}) ManagerOption

WithTracing sets a tracer for distributed tracing. The tracer must implement the tracing interface defined in SyncOptions.

func WithTransport added in v0.15.0

func WithTransport(t Transport) ManagerOption

WithTransport sets a pre-built Transport.

func WithValidation added in v0.15.0

func WithValidation() ManagerOption

WithValidation enables additional validation checks during sync operations.

type ManualReviewResolver added in v0.15.0

type ManualReviewResolver struct{ Reason string }

func (*ManualReviewResolver) Resolve added in v0.15.0

type MatchConditions added in v0.15.0

type MatchConditions struct {
	EventTypes   []string          `json:"event_types,omitempty" yaml:"event_types,omitempty"`
	AggregateIDs []string          `json:"aggregate_ids,omitempty" yaml:"aggregate_ids,omitempty"`
	Fields       []string          `json:"fields,omitempty" yaml:"fields,omitempty"`
	CustomMatch  map[string]string `json:"custom_match,omitempty" yaml:"custom_match,omitempty"`
}

MatchConditions defines when a rule should be applied.

type MementoCaretaker added in v0.15.0

type MementoCaretaker interface {
	// Save stores a resolution memento
	Save(ctx context.Context, memento *ResolutionMemento) error

	// Get retrieves a specific memento by ID
	Get(ctx context.Context, id string) (*ResolutionMemento, error)

	// List retrieves mementos based on criteria
	List(ctx context.Context, criteria *MementoCriteria) ([]*ResolutionMemento, error)

	// Delete removes a memento (with appropriate authorization)
	Delete(ctx context.Context, id string) error

	// GetAuditTrail returns the complete audit trail for an aggregate
	GetAuditTrail(ctx context.Context, aggregateID string) ([]*ResolutionMemento, error)
}

MementoCaretaker manages the storage and retrieval of resolution mementos. It acts as the caretaker in the Memento pattern.

type MementoCriteria added in v0.15.0

type MementoCriteria struct {
	AggregateID  string     `json:"aggregate_id,omitempty"`
	UserID       string     `json:"user_id,omitempty"`
	ResolverName string     `json:"resolver_name,omitempty"`
	GroupName    string     `json:"group_name,omitempty"`
	FromTime     *time.Time `json:"from_time,omitempty"`
	ToTime       *time.Time `json:"to_time,omitempty"`
	Limit        int        `json:"limit,omitempty"`
	Offset       int        `json:"offset,omitempty"`
}

MementoCriteria defines search criteria for querying mementos.

type MetricsCollector

type MetricsCollector interface {
	// RecordSyncDuration records how long a sync operation took
	RecordSyncDuration(operation string, duration time.Duration)

	// RecordSyncEvents records the number of events pushed and pulled
	RecordSyncEvents(pushed, pulled int)

	// RecordSyncErrors records sync operation errors by type
	RecordSyncErrors(operation string, errorType string)

	// RecordConflicts records the number of conflicts resolved
	RecordConflicts(resolved int)
}

MetricsCollector provides hooks for collecting sync operation metrics

type MockMetricsCollector

type MockMetricsCollector struct {
	DurationCalls []struct {
		Operation string
		Duration  time.Duration
	}
	EventCalls []struct {
		Pushed, Pulled int
	}
	ErrorCalls []struct {
		Operation, ErrorType string
	}
	ConflictCalls []struct {
		Resolved int
	}
}

MockMetricsCollector implements MetricsCollector interface for testing

func (*MockMetricsCollector) RecordConflicts

func (m *MockMetricsCollector) RecordConflicts(resolved int)

func (*MockMetricsCollector) RecordSyncDuration

func (m *MockMetricsCollector) RecordSyncDuration(operation string, duration time.Duration)

func (*MockMetricsCollector) RecordSyncErrors

func (m *MockMetricsCollector) RecordSyncErrors(operation string, errorType string)

func (*MockMetricsCollector) RecordSyncEvents

func (m *MockMetricsCollector) RecordSyncEvents(pushed, pulled int)

type NoOpMetricsCollector

type NoOpMetricsCollector struct{}

NoOpMetricsCollector is a default implementation that does nothing

func (*NoOpMetricsCollector) RecordConflicts

func (n *NoOpMetricsCollector) RecordConflicts(resolved int)

func (*NoOpMetricsCollector) RecordSyncDuration

func (n *NoOpMetricsCollector) RecordSyncDuration(operation string, duration time.Duration)

func (*NoOpMetricsCollector) RecordSyncErrors

func (n *NoOpMetricsCollector) RecordSyncErrors(operation string, errorType string)

func (*NoOpMetricsCollector) RecordSyncEvents

func (n *NoOpMetricsCollector) RecordSyncEvents(pushed, pulled int)

type Notification

type Notification struct {
	// Type of notification (data_changed, sync_requested, etc.)
	Type string

	// Source identifies where the change originated
	Source string

	// AggregateIDs that were affected (optional, for filtering)
	AggregateIDs []string

	// Metadata for additional context
	Metadata map[string]interface{}

	// Timestamp when the notification was created
	Timestamp time.Time
}

Notification represents a real-time update notification

type NotificationFilter

type NotificationFilter func(notification Notification) bool

NotificationFilter allows filtering which notifications to process

type NotificationHandler

type NotificationHandler func(notification Notification) error

NotificationHandler processes incoming real-time notifications

type ObservableOption added in v0.15.0

type ObservableOption interface {
	// contains filtered or unexported methods
}

ObservableOption provides configuration for ObservableResolver.

func WithGroupName added in v0.15.0

func WithGroupName(name string) ObservableOption

WithGroupName sets the group name for observability.

func WithMetricsCollector added in v0.15.0

func WithMetricsCollector(mc MetricsCollector) ObservableOption

WithMetricsCollector sets the metrics collector for the resolver.

func WithObservableLogger added in v0.15.0

func WithObservableLogger(logger *slog.Logger) ObservableOption

WithObservableLogger sets the logger for the resolver.

func WithResolutionHooks added in v0.15.0

func WithResolutionHooks(hooks *ResolutionHooks) ObservableOption

WithResolutionHooks sets the resolution hooks for the resolver.

type ObservableResolver added in v0.15.0

type ObservableResolver struct {
	// contains filtered or unexported fields
}

ObservableResolver wraps any ConflictResolver to provide detailed metrics and logging. It implements the Observer pattern for monitoring conflict resolution performance.

func NewObservableResolver added in v0.15.0

func NewObservableResolver(resolver ConflictResolver, opts ...ObservableOption) *ObservableResolver

NewObservableResolver creates a new ObservableResolver that wraps an existing resolver.

func (*ObservableResolver) Resolve added in v0.15.0

func (or *ObservableResolver) Resolve(ctx context.Context, conflict Conflict) (ResolvedConflict, error)

Resolve implements the ConflictResolver interface with added observability.

type Option added in v0.15.0

type Option interface {
	// contains filtered or unexported methods
}

Option implements the Uber-style functional options pattern for construction.

func WithEventTypeRule added in v0.15.0

func WithEventTypeRule(name string, eventType string, resolver ConflictResolver) Option

WithEventTypeRule is a convenience helper for matching by event type.

func WithFallback added in v0.15.0

func WithFallback(r ConflictResolver) Option

WithFallback sets the required fallback ConflictResolver.

func WithHooks added in v0.15.0

func WithHooks(h Hooks) Option

WithHooks sets optional observability hooks. Zero-value safe.

func WithLogger added in v0.15.0

func WithLogger(l any) Option

WithLogger attaches an optional logger (opaque type to avoid dependency).

func WithRule added in v0.15.0

func WithRule(name string, matcher Spec, resolver ConflictResolver) Option

WithRule appends a rule with a custom matcher and resolver in insertion order.

func WithValidator added in v0.15.0

func WithValidator(v Validator) Option

WithValidator sets an optional validator for construction-time checks.

type RealtimeNotifier

type RealtimeNotifier interface {
	// Subscribe starts listening for real-time notifications
	// The handler will be called when new data is available for sync
	Subscribe(ctx context.Context, handler NotificationHandler) error

	// Unsubscribe stops listening for notifications
	Unsubscribe() error

	// Notify sends a notification to connected clients (server-side)
	Notify(ctx context.Context, notification Notification) error

	// IsConnected returns true if the real-time connection is active
	IsConnected() bool

	// Close closes the notifier connection
	Close() error
}

RealtimeNotifier provides real-time push notifications when data changes. This is separate from Transport to allow for different notification mechanisms (WebSockets, SSE, NATS, etc.) while keeping the sync logic agnostic.

type RealtimeSyncManager

type RealtimeSyncManager interface {
	SyncManager

	// EnableRealtime starts real-time notifications
	EnableRealtime(ctx context.Context) error

	// DisableRealtime stops real-time notifications and falls back to polling
	DisableRealtime() error

	// IsRealtimeActive returns true if real-time notifications are active
	IsRealtimeActive() bool

	// GetConnectionStatus returns the current real-time connection status
	GetConnectionStatus() ConnectionStatus
}

RealtimeSyncManager extends SyncManager with real-time capabilities

func NewRealtimeSyncManager

func NewRealtimeSyncManager(store EventStore, transport Transport, options *RealtimeSyncOptions) RealtimeSyncManager

NewRealtimeSyncManager creates a sync manager with real-time capabilities

type RealtimeSyncOptions

type RealtimeSyncOptions struct {
	SyncOptions

	// RealtimeNotifier for push notifications (optional)
	RealtimeNotifier RealtimeNotifier

	// NotificationFilter to process only relevant notifications
	NotificationFilter NotificationFilter

	// DisablePolling stops periodic sync when real-time is active
	DisablePolling bool

	// FallbackInterval for polling when real-time connection is lost
	FallbackInterval time.Duration

	// ReconnectBackoff strategy for reconnecting to real-time notifications
	ReconnectBackoff BackoffStrategy
}

RealtimeSyncOptions extends SyncOptions with real-time capabilities

type ResolutionConfig added in v0.15.0

type ResolutionConfig struct {
	Strategy string                 `json:"strategy" yaml:"strategy"`
	Options  map[string]interface{} `json:"options,omitempty" yaml:"options,omitempty"`
}

ResolutionConfig defines how conflicts should be resolved.

type ResolutionHooks added in v0.15.0

type ResolutionHooks struct {
	OnConflict    func(ctx context.Context, conflict *Conflict)
	OnRuleMatched func(ctx context.Context, conflict *Conflict, rule *Rule, group *RuleGroup)
	OnResolution  func(ctx context.Context, result *ResolvedConflict, duration time.Duration)
	OnFallback    func(ctx context.Context, conflict *Conflict, group *RuleGroup)
	OnError       func(ctx context.Context, conflict *Conflict, err error)
}

ResolutionHooks provides callbacks for observing conflict resolution events.

type ResolutionMemento added in v0.15.0

type ResolutionMemento struct {
	// Unique identifier for this resolution
	ID string `json:"id"`

	// Timestamp when the resolution occurred
	Timestamp time.Time `json:"timestamp"`

	// Conflict information (serialization-friendly)
	EventType     string         `json:"event_type"`
	AggregateID   string         `json:"aggregate_id"`
	ChangedFields []string       `json:"changed_fields,omitempty"`
	Metadata      map[string]any `json:"metadata,omitempty"`

	// Resolution result (serialization-friendly)
	Decision string   `json:"decision"`
	Reasons  []string `json:"reasons,omitempty"`

	// Resolution metadata
	ResolverName string         `json:"resolver_name"`
	RuleName     string         `json:"rule_name,omitempty"`
	GroupName    string         `json:"group_name,omitempty"`
	Context      map[string]any `json:"context,omitempty"`

	// Audit trail information
	UserID    string `json:"user_id,omitempty"`
	SessionID string `json:"session_id,omitempty"`
	Origin    string `json:"origin,omitempty"`

	// Performance metrics
	ResolutionDuration time.Duration `json:"resolution_duration"`

	// State before resolution (for rollback)
	BeforeState *ConflictState `json:"before_state,omitempty"`

	// State after resolution
	AfterState *ConflictState `json:"after_state,omitempty"`

	// For internal use - not serialized
	OriginalConflict Conflict         `json:"-"`
	ResolvedConflict ResolvedConflict `json:"-"`
}

ResolutionMemento captures the complete state and history of a conflict resolution. It implements the Memento pattern to provide audit trails and rollback capabilities. This struct is designed to be JSON serializable by avoiding Event interface fields.

type ResolvedConflict added in v0.15.0

type ResolvedConflict struct {
	ResolvedEvents []EventWithVersion
	Decision       string
	Reasons        []string
}

ResolvedConflict captures the decision and any follow-up data.

type RetryConfig

type RetryConfig struct {
	// MaxAttempts is the maximum number of retry attempts
	MaxAttempts int

	// InitialDelay is the initial delay between retries
	InitialDelay time.Duration

	// MaxDelay is the maximum delay between retries
	MaxDelay time.Duration

	// Multiplier is the factor by which the delay increases
	Multiplier float64
}

RetryConfig configures the retry behavior for sync operations

type RollbackAnalysis added in v0.15.0

type RollbackAnalysis struct {
	TargetMemento        *ResolutionMemento   `json:"target_memento"`
	AffectedResolutions  []*ResolutionMemento `json:"affected_resolutions"`
	RollbackComplexity   string               `json:"rollback_complexity"`
	RequiresReprocessing bool                 `json:"requires_reprocessing"`
	Warnings             []string             `json:"warnings,omitempty"`
}

RollbackAnalysis provides information about the implications of rolling back a resolution.

type RollbackCapability added in v0.15.0

type RollbackCapability struct {
	// contains filtered or unexported fields
}

RollbackCapability provides methods for analyzing rollback possibilities. Note: Actual rollback implementation would require coordination with the storage layer.

func NewRollbackCapability added in v0.15.0

func NewRollbackCapability(caretaker MementoCaretaker) *RollbackCapability

NewRollbackCapability creates a new rollback capability analyzer.

func (*RollbackCapability) AnalyzeRollback added in v0.15.0

func (rc *RollbackCapability) AnalyzeRollback(ctx context.Context, mementoID string) (*RollbackAnalysis, error)

AnalyzeRollback analyzes what would be involved in rolling back a specific resolution.

type Rule added in v0.15.0

type Rule struct {
	Name     string
	Matcher  Spec
	Resolver ConflictResolver
}

Rule binds a matcher Specification to a ConflictResolver Strategy. Rules are evaluated in insertion order with first-match-wins semantics.

type RuleConfig added in v0.15.0

type RuleConfig struct {
	Version     string                 `json:"version" yaml:"version"`
	Name        string                 `json:"name" yaml:"name"`
	Description string                 `json:"description,omitempty" yaml:"description,omitempty"`
	Metadata    map[string]interface{} `json:"metadata,omitempty" yaml:"metadata,omitempty"`

	// Global settings
	Settings ConfigSettings `json:"settings,omitempty" yaml:"settings,omitempty"`

	// Rule groups
	Groups []GroupConfig `json:"groups" yaml:"groups"`

	// Global rules (not in any group)
	Rules []RuleConfigEntry `json:"rules,omitempty" yaml:"rules,omitempty"`
}

RuleConfig represents the complete configuration structure for conflict resolution rules.

type RuleConfigEntry added in v0.15.0

type RuleConfigEntry struct {
	Name        string `json:"name" yaml:"name"`
	Description string `json:"description,omitempty" yaml:"description,omitempty"`
	Enabled     *bool  `json:"enabled,omitempty" yaml:"enabled,omitempty"`
	Priority    int    `json:"priority,omitempty" yaml:"priority,omitempty"`

	// Matching conditions
	Conditions MatchConditions `json:"conditions" yaml:"conditions"`

	// Resolution configuration
	Resolution ResolutionConfig `json:"resolution" yaml:"resolution"`

	// Metadata for custom extensions
	Metadata map[string]interface{} `json:"metadata,omitempty" yaml:"metadata,omitempty"`
}

RuleConfigEntry represents a single rule configuration.

type RuleGroup added in v0.15.0

type RuleGroup struct {
	Name        string
	Description string
	// contains filtered or unexported fields
}

RuleGroup implements the Composite pattern for organizing rules hierarchically. It allows grouping related rules under a common namespace and provides domain modularity without polluting the top-level rule order.

func NewRuleGroup added in v0.15.0

func NewRuleGroup(name string, opts ...RuleGroupOption) *RuleGroup

NewRuleGroup creates a new rule group with the given name.

func (*RuleGroup) AddRule added in v0.15.0

func (rg *RuleGroup) AddRule(rule Rule) *RuleGroup

AddRule adds a rule to this group.

func (*RuleGroup) AddSubGroup added in v0.15.0

func (rg *RuleGroup) AddSubGroup(subGroup *RuleGroup) *RuleGroup

AddSubGroup adds a sub-group to this group, allowing nested hierarchies.

func (*RuleGroup) Disable added in v0.15.0

func (rg *RuleGroup) Disable() *RuleGroup

func (*RuleGroup) Enable added in v0.15.0

func (rg *RuleGroup) Enable() *RuleGroup

Enable/Disable methods for runtime control

func (*RuleGroup) GetAllRules added in v0.15.0

func (rg *RuleGroup) GetAllRules() []Rule

GetAllRules returns all rules from this group and its subgroups in order. This flattens the hierarchy for execution by DynamicResolver.

func (*RuleGroup) GetStats added in v0.15.0

func (rg *RuleGroup) GetStats() RuleGroupStats

GetStats returns statistics about the rule group structure.

func (*RuleGroup) IsEnabled added in v0.15.0

func (rg *RuleGroup) IsEnabled() bool

func (*RuleGroup) ResolveInGroup added in v0.15.0

func (rg *RuleGroup) ResolveInGroup(ctx context.Context, conflict Conflict) (ResolvedConflict, bool, error)

ResolveInGroup attempts to resolve a conflict using only rules within this group. This allows for group-specific resolution before falling back to global rules.

type RuleGroupOption added in v0.15.0

type RuleGroupOption interface {
	// contains filtered or unexported methods
}

RuleGroupOption provides configuration options for rule groups.

func WithDescription added in v0.15.0

func WithDescription(desc string) RuleGroupOption

WithDescription sets a description for the rule group.

func WithEnabled added in v0.15.0

func WithEnabled(enabled bool) RuleGroupOption

WithEnabled sets the enabled state of the rule group.

func WithGroupFallback added in v0.15.0

func WithGroupFallback(resolver ConflictResolver) RuleGroupOption

WithGroupFallback sets a fallback resolver specific to this group.

type RuleGroupStats added in v0.15.0

type RuleGroupStats struct {
	Name          string           `json:"name"`
	Enabled       bool             `json:"enabled"`
	RuleCount     int              `json:"rule_count"`     // Direct rules in this group
	SubGroupCount int              `json:"subgroup_count"` // Number of direct subgroups
	TotalRules    int              `json:"total_rules"`    // Total rules including subgroups
	SubGroups     []RuleGroupStats `json:"subgroups,omitempty"`
}

RuleGroupStats provides statistical information about rule group structure.

type Spec added in v0.15.0

type Spec func(Conflict) bool

Spec is a predicate used to match conflicts to rules. Combinators allow building complex match logic from small, testable pieces.

func AggregateIDMatches added in v0.15.0

func AggregateIDMatches(pattern string) Spec

AggregateIDMatches matches when the aggregate ID matches the pattern.

func AlwaysMatch added in v0.15.0

func AlwaysMatch() Spec

AlwaysMatch returns a spec that always matches.

func And added in v0.15.0

func And(a, b Spec) Spec

And returns a spec that requires both specs to match.

func AndMatcher added in v0.15.0

func AndMatcher(specs ...Spec) Spec

AndMatcher combines multiple specs with AND logic.

func AnyFieldIn added in v0.15.0

func AnyFieldIn(fields ...string) Spec

AnyFieldIn matches when any of the conflict's changed fields is in the set.

func EventTypeIs added in v0.15.0

func EventTypeIs(t string) Spec

EventTypeIs matches a specific event type.

func FieldChanged added in v0.15.0

func FieldChanged(field string) Spec

FieldChanged matches when a specific field has changed.

func MetadataEq added in v0.15.0

func MetadataEq(key string, value any) Spec

MetadataEq matches when Metadata[key] equals value.

func Not added in v0.15.0

func Not(a Spec) Spec

Not returns a spec that negates the provided spec.

func Or added in v0.15.0

func Or(a, b Spec) Spec

Or returns a spec that requires at least one spec to match.

type SyncManager

type SyncManager interface {
	// Sync performs a bidirectional sync operation
	Sync(ctx context.Context) (*SyncResult, error)

	// Push sends local events to remote
	Push(ctx context.Context) (*SyncResult, error)

	// Pull retrieves remote events to local
	Pull(ctx context.Context) (*SyncResult, error)

	// StartAutoSync begins automatic synchronization at the configured interval
	StartAutoSync(ctx context.Context) error

	// StopAutoSync stops automatic synchronization
	StopAutoSync() error

	// Subscribe to sync events (optional)
	Subscribe(handler func(*SyncResult)) error

	// Close shuts down the sync manager
	Close() error
}

SyncManager coordinates the synchronization process between local and remote stores. This is the main entry point for the sync package.

func NewManager added in v0.15.0

func NewManager(opts ...ManagerOption) (SyncManager, error)

NewManager constructs a SyncManager using functional options on top of the existing builder. It keeps your builder for advanced use while offering a concise, discoverable API.

func NewSyncManager

func NewSyncManager(store EventStore, transport Transport, opts *SyncOptions, logger *slog.Logger) SyncManager

NewSyncManager creates a new sync manager with the provided components

type SyncManagerBuilder

type SyncManagerBuilder struct {
	// contains filtered or unexported fields
}

SyncManagerBuilder provides a fluent interface for constructing SyncManager instances.

func NewSyncManagerBuilder

func NewSyncManagerBuilder() *SyncManagerBuilder

NewSyncManagerBuilder creates a new builder with default options.

func (*SyncManagerBuilder) Build

func (b *SyncManagerBuilder) Build() (SyncManager, error)

Build creates a new SyncManager instance with the configured options.

func (*SyncManagerBuilder) Reset

Reset clears the builder, allowing reuse.

func (*SyncManagerBuilder) WithBatchSize

func (b *SyncManagerBuilder) WithBatchSize(size int) *SyncManagerBuilder

WithBatchSize sets the batch size for sync operations.

func (*SyncManagerBuilder) WithCompression

func (b *SyncManagerBuilder) WithCompression(enabled bool) *SyncManagerBuilder

WithCompression enables data compression during transport.

func (*SyncManagerBuilder) WithConflictResolver

func (b *SyncManagerBuilder) WithConflictResolver(resolver ConflictResolver) *SyncManagerBuilder

WithConflictResolver sets the conflict resolution strategy.

func (*SyncManagerBuilder) WithFilter

func (b *SyncManagerBuilder) WithFilter(filter func(Event) bool) *SyncManagerBuilder

WithFilter sets an event filter function.

func (*SyncManagerBuilder) WithHealthChecker added in v0.17.0

func (b *SyncManagerBuilder) WithHealthChecker(checker interface{}) *SyncManagerBuilder

WithHealthChecker sets a health checker for monitoring sync-kit component health.

func (*SyncManagerBuilder) WithLogger added in v0.12.0

func (b *SyncManagerBuilder) WithLogger(logger *slog.Logger) *SyncManagerBuilder

WithLogger sets a custom logger for the SyncManager.

func (*SyncManagerBuilder) WithMetricsCollector added in v0.17.0

func (b *SyncManagerBuilder) WithMetricsCollector(collector MetricsCollector) *SyncManagerBuilder

WithMetricsCollector sets a metrics collector for observability.

func (*SyncManagerBuilder) WithPullOnly

func (b *SyncManagerBuilder) WithPullOnly() *SyncManagerBuilder

WithPullOnly configures the SyncManager to only pull events.

func (*SyncManagerBuilder) WithPushOnly

func (b *SyncManagerBuilder) WithPushOnly() *SyncManagerBuilder

WithPushOnly configures the SyncManager to only push events.

func (*SyncManagerBuilder) WithStore

func (b *SyncManagerBuilder) WithStore(store EventStore) *SyncManagerBuilder

WithStore sets the EventStore for the SyncManager.

func (*SyncManagerBuilder) WithSyncInterval

func (b *SyncManagerBuilder) WithSyncInterval(interval time.Duration) *SyncManagerBuilder

WithSyncInterval sets the interval for automatic synchronization.

func (*SyncManagerBuilder) WithTimeout

func (b *SyncManagerBuilder) WithTimeout(timeout time.Duration) *SyncManagerBuilder

WithTimeout sets the maximum duration for sync operations.

func (*SyncManagerBuilder) WithTracer added in v0.17.0

func (b *SyncManagerBuilder) WithTracer(tracer interface {
	StartSyncOperation(ctx context.Context, operation string) (context.Context, trace.Span)
	StartTransportOperation(ctx context.Context, operation, transport string) (context.Context, trace.Span)
	StartStorageOperation(ctx context.Context, operation, storageType string) (context.Context, trace.Span)
	StartConflictResolution(ctx context.Context, strategy string) (context.Context, trace.Span)
	RecordError(span trace.Span, err error, description string)
	SetSyncResult(span trace.Span, eventsPushed, eventsPulled, conflictsResolved int)
}) *SyncManagerBuilder

WithTracer sets a tracer for distributed tracing.

func (*SyncManagerBuilder) WithTransport

func (b *SyncManagerBuilder) WithTransport(transport Transport) *SyncManagerBuilder

WithTransport sets the Transport for the SyncManager.

func (*SyncManagerBuilder) WithValidation

func (b *SyncManagerBuilder) WithValidation() *SyncManagerBuilder

WithValidation enables additional validation checks during sync operations.

type SyncOptions

type SyncOptions struct {
	// LastCursorLoader loads the last saved cursor for cursor-based syncs
	LastCursorLoader func() cursor.Cursor

	// CursorSaver saves the latest cursor after a successful sync
	CursorSaver func(cursor.Cursor) error

	// PushOnly indicates this client should only push events, not pull
	PushOnly bool

	// PullOnly indicates this client should only pull events, not push
	PullOnly bool

	// ConflictResolver to use for handling conflicts
	ConflictResolver ConflictResolver

	// Filter can be used to sync only specific events
	Filter func(Event) bool

	// BatchSize limits how many events to sync at once
	BatchSize int

	// SyncInterval for automatic periodic sync (0 disables)
	SyncInterval time.Duration

	// RetryConfig configures retry behavior for sync operations
	RetryConfig *RetryConfig

	// EnableValidation enables additional validation checks during sync
	EnableValidation bool

	// Timeout sets the maximum duration for sync operations
	Timeout time.Duration

	// EnableCompression enables data compression during transport
	EnableCompression bool

	// MetricsCollector for observability hooks (optional)
	MetricsCollector MetricsCollector

	// Tracer for distributed tracing (optional)
	Tracer interface {
		// StartSyncOperation starts a new span for a sync operation
		StartSyncOperation(ctx context.Context, operation string) (context.Context, trace.Span)
		// StartTransportOperation starts a new span for transport operations
		StartTransportOperation(ctx context.Context, operation, transport string) (context.Context, trace.Span)
		// StartStorageOperation starts a new span for storage operations
		StartStorageOperation(ctx context.Context, operation, storageType string) (context.Context, trace.Span)
		// StartConflictResolution starts a new span for conflict resolution
		StartConflictResolution(ctx context.Context, strategy string) (context.Context, trace.Span)
		// RecordError records an error on a span
		RecordError(span trace.Span, err error, description string)
		// SetSyncResult sets sync result attributes on a span
		SetSyncResult(span trace.Span, eventsPushed, eventsPulled, conflictsResolved int)
	}
}

SyncOptions configures the synchronization behavior

type SyncResult

type SyncResult struct {
	// EventsPushed is the number of events sent to remote
	EventsPushed int

	// EventsPulled is the number of events received from remote
	EventsPulled int

	// ConflictsResolved is the number of conflicts that were resolved
	ConflictsResolved int

	// Errors contains any non-fatal errors that occurred during sync
	Errors []error

	// StartTime is when the sync operation began
	StartTime time.Time

	// Duration is how long the sync took
	Duration time.Duration

	// LocalVersion is the local version after sync
	LocalVersion Version

	// RemoteVersion is the remote version after sync
	RemoteVersion Version
}

SyncResult provides information about a completed sync operation

type TestEvent

type TestEvent struct{}

TestEvent implements Event interface for testing

func (*TestEvent) AggregateID

func (m *TestEvent) AggregateID() string

func (*TestEvent) Data

func (m *TestEvent) Data() interface{}

func (*TestEvent) ID

func (m *TestEvent) ID() string

func (*TestEvent) Metadata

func (m *TestEvent) Metadata() map[string]interface{}

func (*TestEvent) Type

func (m *TestEvent) Type() string

type TestEventStore

type TestEventStore struct {
	// contains filtered or unexported fields
}

TestEventStore implements a simple in-memory event store for testing

func (*TestEventStore) Close

func (m *TestEventStore) Close() error

func (*TestEventStore) LatestVersion

func (m *TestEventStore) LatestVersion(_ context.Context) (Version, error)

func (*TestEventStore) Load

func (*TestEventStore) LoadByAggregate

func (m *TestEventStore) LoadByAggregate(_ context.Context, _ string, _ Version) ([]EventWithVersion, error)

func (*TestEventStore) ParseVersion

func (m *TestEventStore) ParseVersion(_ context.Context, _ string) (Version, error)

func (*TestEventStore) Store

func (m *TestEventStore) Store(_ context.Context, event Event, version Version) error

type TestTransport

type TestTransport struct {
	// contains filtered or unexported fields
}

TestTransport implements a simple transport for testing

func (*TestTransport) Close

func (m *TestTransport) Close() error

func (*TestTransport) GetLatestVersion

func (m *TestTransport) GetLatestVersion(ctx context.Context) (Version, error)

func (*TestTransport) Pull

func (m *TestTransport) Pull(ctx context.Context, since Version) ([]EventWithVersion, error)

func (*TestTransport) Push

func (m *TestTransport) Push(ctx context.Context, events []EventWithVersion) error

func (*TestTransport) Subscribe

func (m *TestTransport) Subscribe(_ context.Context, _ func([]EventWithVersion) error) error

type TestVersion

type TestVersion struct{}

TestVersion implements Version interface for testing

func (*TestVersion) Compare

func (m *TestVersion) Compare(_ Version) int

func (*TestVersion) IsZero

func (m *TestVersion) IsZero() bool

func (*TestVersion) String

func (m *TestVersion) String() string

type TimeoutSettings added in v0.15.0

type TimeoutSettings struct {
	ResolutionTimeoutMs   int `json:"resolution_timeout_ms,omitempty" yaml:"resolution_timeout_ms,omitempty"`
	ManualReviewTimeoutMs int `json:"manual_review_timeout_ms,omitempty" yaml:"manual_review_timeout_ms,omitempty"`
}

TimeoutSettings contains timeout configurations.

type Transport

type Transport interface {
	// Push sends events to the remote endpoint
	Push(ctx context.Context, events []EventWithVersion) error

	// Pull retrieves events from the remote endpoint since the given version
	Pull(ctx context.Context, since Version) ([]EventWithVersion, error)

	// GetLatestVersion efficiently retrieves the latest version from remote without pulling events
	GetLatestVersion(ctx context.Context) (Version, error)

	// Subscribe listens for real-time updates (optional for polling-based transports)
	Subscribe(ctx context.Context, handler func([]EventWithVersion) error) error

	// Close closes the transport connection
	Close() error
}

Transport handles the actual network communication between clients and servers. Implementations can use HTTP, gRPC, WebSockets, NATS, etc.

type Validator added in v0.15.0

type Validator interface {
	Validate(options *resolverOptions) error
}

Validator can perform configuration validation at construction time. Return nil if configuration is valid; return error to prevent construction.

type Version

type Version = types.Version

Version represents a point-in-time snapshot for sync operations. Users can implement different versioning strategies (timestamps, hashes, vector clocks).

Directories

Path Synopsis
Package types contains shared types used across the synckit ecosystem.
Package types contains shared types used across the synckit ecosystem.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL