Documentation
¶
Overview ¶
Package synckit - aliases for backward compatibility and future expansion
Package synckit provides a generic event-driven synchronization system for distributed applications.
Overview ¶
Synckit enables offline-first architectures with conflict resolution and pluggable storage backends. Applications import this single package to access all core types and interfaces:
import "github.com/c0deZ3R0/go-sync-kit/synckit"
Core Concepts ¶
The system is built around four key abstractions:
1. Event: Domain events representing state changes 2. Version: Point-in-time snapshots for ordering and conflict detection 3. Store: Local event persistence (EventStore interface) 4. Transport: Network communication layer (Transport interface)
Usage Example ¶
// Define your event type
type UserEvent struct {
id string
eventType string
aggID string
data interface{}
meta map[string]interface{}
}
func (e *UserEvent) ID() string { return e.id }
func (e *UserEvent) Type() string { return e.eventType }
func (e *UserEvent) AggregateID() string { return e.aggID }
func (e *UserEvent) Data() interface{} { return e.data }
func (e *UserEvent) Metadata() map[string]interface{} { return e.meta }
// Use a storage backend
store, _ := memstore.New()
// Configure a transport
transport := httptransport.NewClient("https://api.example.com")
// Create a sync node
node := synckit.NewNode(store, transport)
// Perform synchronization
result, err := node.Sync(context.Background())
if err != nil {
log.Fatal(err)
}
log.Printf("Synced: %d pushed, %d pulled", result.EventsPushed, result.EventsPulled)
Conflict Resolution ¶
When local and remote changes collide, implement a ConflictResolver:
type LastWriteWinsResolver struct{}
func (r *LastWriteWinsResolver) Resolve(ctx context.Context, c synckit.Conflict) (synckit.ResolvedConflict, error) {
// Compare versions and pick the latest
if c.Remote.Version.Compare(c.Local.Version) > 0 {
return synckit.ResolvedConflict{
ResolvedEvents: []synckit.EventWithVersion{c.Remote},
Decision: "remote-wins",
}, nil
}
return synckit.ResolvedConflict{
ResolvedEvents: []synckit.EventWithVersion{c.Local},
Decision: "local-wins",
}, nil
}
Architecture ¶
Synckit supports: - Offline-first: local persistence with eventual consistency - Pluggable stores: in-memory, SQLite, PostgreSQL, BadgerDB - Pluggable transports: HTTP, WebSockets, SSE, RabbitMQ - Conflict resolution: custom strategies per domain - Observability: structured logging, metrics, tracing
API surface ¶
Import synckit to access all core types and interfaces:
import "github.com/c0deZ3R0/go-sync-kit/synckit"
Core types (aliased from synckit/types): - Event: Represents a syncable event - Version: Point-in-time snapshot for ordering and conflict detection - EventWithVersion: Pairs an event with its version - Conflict: Context for resolving detected conflicts - ResolvedConflict: Resolution decision and follow-up data - ConflictResolver: Strategy interface for conflict resolution
Core interfaces: - EventStore: Persistence for events (see sync.go) - Transport: Network communication layer (see sync.go) - CursorTransport: Transport with cursor-based pagination (see sync.go)
Implementor guidance - EventStore: implement Store, Load, LoadByAggregate, LatestVersion, ParseVersion, Close - Transport: implement Push, Pull, GetLatestVersion, Subscribe, Close
See subpackages for concrete implementations: - storage/memstore: In-memory store for testing - storage/sqlite: SQLite-based persistent store - storage/postgres: PostgreSQL-based persistent store - transport/httptransport: HTTP-based transport - transport/sse: Server-Sent Events transport - transport/rabbitmq: RabbitMQ-based transport
Package sync provides a generic event-driven synchronization system for distributed applications. It supports offline-first architectures with conflict resolution and pluggable storage backends.
Example ¶
Example demonstrates the basic sync workflow: store events locally, configure a node, and sync.
package main
import (
"context"
"fmt"
"io"
"log/slog"
"github.com/c0deZ3R0/go-sync-kit/storage/memstore"
"github.com/c0deZ3R0/go-sync-kit/synckit"
"github.com/c0deZ3R0/go-sync-kit/transport/memchan"
)
func main() {
ctx := context.Background()
// 1. Create storage and transport
store := memstore.New()
transport := memchan.New(16)
silentLogger := slog.New(slog.NewTextHandler(io.Discard, nil))
// 2. Create a node
node, err := synckit.NewNode(
synckit.WithStore(store),
synckit.WithTransport(transport),
synckit.WithManagerLogger(silentLogger),
)
if err != nil {
panic(err)
}
// 3. Perform a sync cycle
result, err := node.Sync(ctx)
if err != nil {
panic(err)
}
// 4. Inspect results
if result.EventsPushed == 0 && result.EventsPulled == 0 {
fmt.Println("Sync complete: no events to sync")
}
}
Output: Sync complete: no events to sync
Index ¶
- func NewSyncStateMachine() (statemachine.StateMachine[SyncState], error)
- func SyncStateTransitionRules() statemachine.TransitionRules[SyncState]
- type AdditiveMergeResolver
- type AuditableOption
- type AuditableResolver
- type BackoffStrategy
- type BasicValidator
- type CompositeOption
- type CompositeResolver
- type Config
- type ConfigLoader
- type ConfigLoaderOption
- type ConfigSettings
- type ConfigTransformer
- type ConfigValidator
- type ConfigWatcher
- type Conflict
- type ConflictResolver
- type ConflictState
- type ConnectionStatus
- type CursorMode
- type CursorTransport
- type DynamicResolver
- type Event
- type EventStore
- type EventWithVersion
- type ExponentialBackoff
- type ExtendedMetricsCollector
- type ExtendedNoOpMetricsCollector
- func (c *ExtendedNoOpMetricsCollector) RecordFallbackUsage(group string)
- func (c *ExtendedNoOpMetricsCollector) RecordManualReview(reason string)
- func (c *ExtendedNoOpMetricsCollector) RecordResolution(rule, group, decision string, dur time.Duration, success bool)
- func (c *ExtendedNoOpMetricsCollector) RecordResolutionError(rule, group, errType string)
- func (c *ExtendedNoOpMetricsCollector) RecordRuleMatch(rule, group string)
- type Filter
- type FirstWriteWinsResolver
- type GroupConfig
- type Hooks
- type InMemoryMementoCaretaker
- func (c *InMemoryMementoCaretaker) Delete(ctx context.Context, id string) error
- func (c *InMemoryMementoCaretaker) Get(ctx context.Context, id string) (*ResolutionMemento, error)
- func (c *InMemoryMementoCaretaker) GetAuditTrail(ctx context.Context, aggregateID string) ([]*ResolutionMemento, error)
- func (c *InMemoryMementoCaretaker) List(ctx context.Context, criteria *MementoCriteria) ([]*ResolutionMemento, error)
- func (c *InMemoryMementoCaretaker) Save(ctx context.Context, memento *ResolutionMemento) error
- type LastWriteWinsResolver
- type LimitSettings
- type Logger
- type LoggingWatcher
- type ManagerOption
- func WithAdditiveMerge() ManagerOption
- func WithBatchSize(n int) ManagerOption
- func WithCompression(enabled bool) ManagerOption
- func WithConflictResolver(r ConflictResolver) ManagerOption
- func WithFWW() ManagerOption
- func WithFilter(filter func(Event) bool) ManagerOption
- func WithHTTPTransport(baseURL string) ManagerOption
- func WithHealthChecker(checker interface{ ... }) ManagerOption
- func WithLWW() ManagerOption
- func WithManagerLogger(logger *slog.Logger) ManagerOption
- func WithMetrics(collector MetricsCollector) ManagerOption
- func WithNullTransport() ManagerOption
- func WithProjectionMaxWorkers(workers int) ManagerOption
- func WithProjectionTimeout(timeout time.Duration) ManagerOption
- func WithProjections(runners ...ProjectionRunner) ManagerOption
- func WithProjectionsOnSync(enabled bool) ManagerOption
- func WithPullOnly() ManagerOption
- func WithPushOnly() ManagerOption
- func WithSQLite(path string) ManagerOption
- func WithStore(s EventStore) ManagerOption
- func WithSyncInterval(interval time.Duration) ManagerOption
- func WithTimeout(d time.Duration) ManagerOption
- func WithTracing(tracer interface{ ... }) ManagerOption
- func WithTransport(t Transport) ManagerOption
- func WithValidation() ManagerOption
- type ManualReviewResolver
- type MatchConditions
- type MementoCaretaker
- type MementoCriteria
- type MetricsCollector
- type MockMetricsCollector
- func (m *MockMetricsCollector) CopyConflictCalls() []struct{ ... }
- func (m *MockMetricsCollector) CopyDurationCalls() []struct{ ... }
- func (m *MockMetricsCollector) CopyErrorCalls() []struct{ ... }
- func (m *MockMetricsCollector) CopyEventCalls() []struct{ ... }
- func (m *MockMetricsCollector) RecordConflicts(resolved int)
- func (m *MockMetricsCollector) RecordSyncDuration(operation string, duration time.Duration)
- func (m *MockMetricsCollector) RecordSyncErrors(operation string, errorType string)
- func (m *MockMetricsCollector) RecordSyncEvents(pushed, pulled int)
- type NoOpMetricsCollector
- func (n *NoOpMetricsCollector) RecordConflicts(resolved int)
- func (n *NoOpMetricsCollector) RecordSyncDuration(operation string, duration time.Duration)
- func (n *NoOpMetricsCollector) RecordSyncErrors(operation string, errorType string)
- func (n *NoOpMetricsCollector) RecordSyncEvents(pushed, pulled int)
- type Notification
- type NotificationFilter
- type NotificationHandler
- type ObservableOption
- type ObservableResolver
- type Option
- func WithEventTypeRule(name string, eventType string, resolver ConflictResolver) Option
- func WithFallback(r ConflictResolver) Option
- func WithHooks(h Hooks) Option
- func WithLogger(l any) Option
- func WithRule(name string, matcher Spec, resolver ConflictResolver) Option
- func WithStateMachine(options *statemachine.StatefulResolverOptions) Option
- func WithValidator(v Validator) Option
- type ProjectionConfig
- type ProjectionRunner
- type RealtimeNotifier
- type RealtimeSyncManager
- type RealtimeSyncOptions
- type ResolutionConfig
- type ResolutionHooks
- type ResolutionMemento
- type ResolvedConflict
- type RetryConfig
- type RetryPolicy
- type RollbackAnalysis
- type RollbackCapability
- type Rule
- type RuleConfig
- type RuleConfigEntry
- type RuleGroup
- func (rg *RuleGroup) AddRule(rule Rule) *RuleGroup
- func (rg *RuleGroup) AddSubGroup(subGroup *RuleGroup) *RuleGroup
- func (rg *RuleGroup) Disable() *RuleGroup
- func (rg *RuleGroup) Enable() *RuleGroup
- func (rg *RuleGroup) GetAllRules() []Rule
- func (rg *RuleGroup) GetStats() RuleGroupStats
- func (rg *RuleGroup) IsEnabled() bool
- func (rg *RuleGroup) ResolveInGroup(ctx context.Context, conflict Conflict) (ResolvedConflict, bool, error)
- type RuleGroupOption
- type RuleGroupStats
- type Spec
- func AggregateIDMatches(pattern string) Spec
- func AlwaysMatch() Spec
- func And(a, b Spec) Spec
- func AndMatcher(specs ...Spec) Spec
- func AnyFieldIn(fields ...string) Spec
- func EventTypeIs(t string) Spec
- func FieldChanged(field string) Spec
- func MetadataEq(key string, value any) Spec
- func Not(a Spec) Spec
- func Or(a, b Spec) Spec
- type StateAwareSyncResult
- type StatefulDynamicResolver
- func (sdr *StatefulDynamicResolver) Configure(options *statemachine.StatefulResolverOptions) error
- func (sdr *StatefulDynamicResolver) GetActiveWorkflows() []*statemachine.WorkflowStatus
- func (sdr *StatefulDynamicResolver) GetAuditTrail(conflictID string) (*statemachine.ConflictAuditTrail, error)
- func (sdr *StatefulDynamicResolver) GetCurrentState() statemachine.ConflictResolutionState
- func (sdr *StatefulDynamicResolver) GetPerformanceMetrics() *statemachine.ResolverPerformanceMetrics
- func (sdr *StatefulDynamicResolver) GetStateHistory() []statemachine.StateTransition[statemachine.ConflictResolutionState]
- func (sdr *StatefulDynamicResolver) GetWorkflowByID(conflictID string) (*statemachine.ConflictWorkflow, bool)
- func (sdr *StatefulDynamicResolver) GetWorkflowManager() *statemachine.WorkflowManager
- func (sdr *StatefulDynamicResolver) IsStateMachineEnabled() bool
- func (sdr *StatefulDynamicResolver) Resolve(ctx context.Context, c Conflict) (ResolvedConflict, error)
- func (sdr *StatefulDynamicResolver) SubscribeToStateChanges(observer statemachine.StateObserver[statemachine.ConflictResolutionState])
- type StatefulRealtimeNotifier
- type StatefulRealtimeSyncManager
- func (srsm *StatefulRealtimeSyncManager) CanReceiveRealtimeData() bool
- func (srsm *StatefulRealtimeSyncManager) CanSendRealtimeData() bool
- func (rsm StatefulRealtimeSyncManager) Close() error
- func (rsm StatefulRealtimeSyncManager) DisableRealtime() error
- func (rsm StatefulRealtimeSyncManager) EnableRealtime(ctx context.Context) error
- func (srsm *StatefulRealtimeSyncManager) EnhancedClose() error
- func (srsm *StatefulRealtimeSyncManager) EnhancedDisableRealtime() error
- func (srsm *StatefulRealtimeSyncManager) EnhancedEnableRealtime(ctx context.Context) error
- func (rsm StatefulRealtimeSyncManager) GetConnectionStatus() ConnectionStatus
- func (srsm *StatefulRealtimeSyncManager) GetEnhancedConnectionStatus() map[string]interface{}
- func (srsm *StatefulRealtimeSyncManager) GetTransportConnectionMetadata() map[string]interface{}
- func (srsm *StatefulRealtimeSyncManager) GetTransportConnectionState() statemachine.TransportState
- func (rsm StatefulRealtimeSyncManager) IsRealtimeActive() bool
- func (srsm *StatefulRealtimeSyncManager) IsTransportHealthy() bool
- func (srsm *StatefulRealtimeSyncManager) MonitorTransportHealth(ctx context.Context, checkInterval time.Duration)
- func (srsm *StatefulRealtimeSyncManager) SubscribeToTransportStateChanges(handler func(statemachine.StateTransition[statemachine.TransportState])) error
- type StatefulRealtimeSyncOptions
- type SyncManagerdeprecated
- type SyncManagerBuilder
- func (b *SyncManagerBuilder) Build() (SyncManager, error)
- func (b *SyncManagerBuilder) Reset() *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithBatchSize(size int) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithCompression(enabled bool) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithConflictResolver(resolver ConflictResolver) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithFilter(filter func(Event) bool) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithHealthChecker(checker interface{}) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithLogger(logger *slog.Logger) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithMetricsCollector(collector MetricsCollector) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithProjectionMaxWorkers(workers int) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithProjectionTimeout(timeout time.Duration) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithProjections(runners ...ProjectionRunner) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithProjectionsOnSync(enabled bool) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithPullOnly() *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithPushOnly() *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithStore(store EventStore) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithSyncInterval(interval time.Duration) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithTimeout(timeout time.Duration) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithTracer(tracer interface{ ... }) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithTransport(transport Transport) *SyncManagerBuilder
- func (b *SyncManagerBuilder) WithValidation() *SyncManagerBuilder
- type SyncNode
- type SyncOptions
- type SyncResult
- type SyncState
- type SyncStateObserver
- type TestEvent
- type TestEventStore
- func (m *TestEventStore) Close() error
- func (m *TestEventStore) LatestVersion(_ context.Context) (Version, error)
- func (m *TestEventStore) Load(_ context.Context, _ Version, _ ...Filter) ([]EventWithVersion, error)
- func (m *TestEventStore) LoadByAggregate(_ context.Context, _ string, _ Version, _ ...Filter) ([]EventWithVersion, error)
- func (m *TestEventStore) ParseVersion(_ context.Context, _ string) (Version, error)
- func (m *TestEventStore) Store(_ context.Context, event Event, version Version) error
- type TestTransport
- func (m *TestTransport) Close() error
- func (m *TestTransport) GetLatestVersion(ctx context.Context) (Version, error)
- func (m *TestTransport) Pull(ctx context.Context, since Version) ([]EventWithVersion, error)
- func (m *TestTransport) Push(ctx context.Context, events []EventWithVersion) error
- func (m *TestTransport) Subscribe(_ context.Context, _ func([]EventWithVersion) error) error
- type TestVersion
- type TimeoutSettings
- type Transport
- type Validator
- type Version
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewSyncStateMachine ¶ added in v0.18.0
func NewSyncStateMachine() (statemachine.StateMachine[SyncState], error)
NewSyncStateMachine creates a new state machine for sync operations.
func SyncStateTransitionRules ¶ added in v0.18.0
func SyncStateTransitionRules() statemachine.TransitionRules[SyncState]
SyncStateTransitionRules defines the valid transitions for sync operations.
Types ¶
type AdditiveMergeResolver ¶ added in v0.15.0
type AdditiveMergeResolver struct{}
func (*AdditiveMergeResolver) Resolve ¶ added in v0.15.0
func (r *AdditiveMergeResolver) Resolve(ctx context.Context, c Conflict) (ResolvedConflict, error)
type AuditableOption ¶ added in v0.15.0
type AuditableOption interface {
// contains filtered or unexported methods
}
AuditableOption provides configuration options for AuditableResolver.
func WithAuditLogger ¶ added in v0.15.0
func WithAuditLogger(logger Logger) AuditableOption
WithAuditLogger sets a logger for the auditable resolver.
func WithOriginExtractor ¶ added in v0.15.0
func WithOriginExtractor(extractor func(context.Context) string) AuditableOption
WithOriginExtractor sets a function to extract origin information from context.
func WithSessionIDExtractor ¶ added in v0.15.0
func WithSessionIDExtractor(extractor func(context.Context) string) AuditableOption
WithSessionIDExtractor sets a function to extract session ID from context.
func WithUserIDExtractor ¶ added in v0.15.0
func WithUserIDExtractor(extractor func(context.Context) string) AuditableOption
WithUserIDExtractor sets a function to extract user ID from context.
type AuditableResolver ¶ added in v0.15.0
type AuditableResolver struct {
// contains filtered or unexported fields
}
AuditableResolver wraps any ConflictResolver to provide audit trail capabilities.
func NewAuditableResolver ¶ added in v0.15.0
func NewAuditableResolver(resolver ConflictResolver, caretaker MementoCaretaker, opts ...AuditableOption) *AuditableResolver
NewAuditableResolver creates a new auditable resolver that wraps an existing resolver.
func (*AuditableResolver) GetAuditTrail ¶ added in v0.15.0
func (ar *AuditableResolver) GetAuditTrail(ctx context.Context, aggregateID string) ([]*ResolutionMemento, error)
GetAuditTrail retrieves the audit trail for a specific aggregate.
func (*AuditableResolver) Resolve ¶ added in v0.15.0
func (ar *AuditableResolver) Resolve(ctx context.Context, conflict Conflict) (ResolvedConflict, error)
Resolve implements ConflictResolver interface with audit trail creation.
type BackoffStrategy ¶
type BackoffStrategy interface {
// NextDelay returns the delay before the next reconnection attempt
NextDelay(attempt int) time.Duration
// Reset resets the backoff strategy after a successful connection
Reset()
}
BackoffStrategy defines how to handle reconnection delays
type BasicValidator ¶ added in v0.15.0
type BasicValidator struct{}
BasicValidator provides basic configuration validation.
func (*BasicValidator) Name ¶ added in v0.15.0
func (v *BasicValidator) Name() string
func (*BasicValidator) Validate ¶ added in v0.15.0
func (v *BasicValidator) Validate(config *RuleConfig) error
type CompositeOption ¶ added in v0.15.0
type CompositeOption interface {
// contains filtered or unexported methods
}
CompositeOption provides configuration options for CompositeResolver.
func WithCompositeLogger ¶ added in v0.15.0
func WithCompositeLogger(logger Logger) CompositeOption
WithCompositeLogger sets a logger for the composite resolver.
type CompositeResolver ¶ added in v0.15.0
type CompositeResolver struct {
// contains filtered or unexported fields
}
CompositeResolver implements ConflictResolver and acts as a root container for multiple rule groups with group-aware resolution.
func NewCompositeResolver ¶ added in v0.15.0
func NewCompositeResolver(fallback ConflictResolver, opts ...CompositeOption) *CompositeResolver
NewCompositeResolver creates a new composite resolver that manages multiple rule groups.
func (*CompositeResolver) AddGroup ¶ added in v0.15.0
func (cr *CompositeResolver) AddGroup(group *RuleGroup) *CompositeResolver
AddGroup adds a rule group to the composite resolver.
func (*CompositeResolver) GetAllGroups ¶ added in v0.15.0
func (cr *CompositeResolver) GetAllGroups() []*RuleGroup
GetAllGroups returns all rule groups managed by this composite resolver.
func (*CompositeResolver) GetGroupStats ¶ added in v0.15.0
func (cr *CompositeResolver) GetGroupStats() []RuleGroupStats
GetGroupStats returns statistics for all managed rule groups.
func (*CompositeResolver) Resolve ¶ added in v0.15.0
func (cr *CompositeResolver) Resolve(ctx context.Context, conflict Conflict) (ResolvedConflict, error)
Resolve implements ConflictResolver interface for CompositeResolver. It tries each group in order, then falls back to the global fallback.
type Config ¶ added in v0.23.0
type Config struct {
// Store provides local event persistence (required).
Store EventStore
// Transport handles network communication (optional for local-only scenarios).
Transport Transport
// Logger for structured logging (optional; defaults to slog.Default()).
Logger *slog.Logger
// Cursor specifies the versioning strategy (Integer or Vector).
// Default: CursorInteger.
Cursor CursorMode
// Retry defines backoff/retry policy for transient failures.
// Default: no retries.
Retry RetryPolicy
// Resolvers is the conflict resolution registry (optional).
// If nil, uses Last-Write-Wins as default resolver.
// Note: ResolverRegistry type is forward-compatible placeholder; use ConflictResolver for now.
Resolvers ConflictResolver
// Timeout is the maximum duration for sync operations (0 = no timeout).
// Default: 0 (no timeout).
Timeout time.Duration
// BatchSize limits the number of events to sync at once.
// Default: 100.
BatchSize int
// SyncInterval for automatic periodic sync (0 = disabled).
// Default: 0 (manual sync only).
SyncInterval time.Duration
// PushOnly restricts sync to only pushing local events (no pull).
// Default: false.
PushOnly bool
// PullOnly restricts sync to only pulling remote events (no push).
// Default: false.
PullOnly bool
// EnableValidation enables additional validation checks during sync.
// Default: false.
EnableValidation bool
// EnableCompression enables data compression during transport.
// Default: false.
EnableCompression bool
// Filter is an optional event filter function.
// Events are synced only if Filter returns true.
Filter func(Event) bool
// MetricsCollector for observability hooks (optional).
MetricsCollector MetricsCollector
// Tracer for distributed tracing (optional).
Tracer interface {
StartSyncOperation(ctx context.Context, operation string) (context.Context, trace.Span)
StartTransportOperation(ctx context.Context, operation, transport string) (context.Context, trace.Span)
StartStorageOperation(ctx context.Context, operation, storageType string) (context.Context, trace.Span)
StartConflictResolution(ctx context.Context, strategy string) (context.Context, trace.Span)
RecordError(span trace.Span, err error, description string)
SetSyncResult(span trace.Span, eventsPushed, eventsPulled, conflictsResolved int)
}
}
Config provides a canonical configuration structure for creating a SyncManager. It offers a declarative alternative to functional options, with built-in validation.
Example usage:
cfg := synckit.Config{
Store: myStore,
Transport: myTransport,
Logger: slog.Default(),
Cursor: synckit.CursorInteger,
Retry: synckit.RetryPolicy{
Max: 3,
Base: 100 * time.Millisecond,
Cap: 5 * time.Second,
Jitter: true,
},
Timeout: 30 * time.Second,
}
mgr, err := synckit.New(cfg)
if err != nil {
log.Fatal(err)
}
type ConfigLoader ¶ added in v0.15.0
type ConfigLoader struct {
// contains filtered or unexported fields
}
ConfigLoader provides dynamic loading and validation of conflict resolution rules from YAML or JSON configuration files with runtime update capabilities.
func NewConfigLoader ¶ added in v0.15.0
func NewConfigLoader(opts ...ConfigLoaderOption) *ConfigLoader
NewConfigLoader creates a new configuration loader.
func (*ConfigLoader) BuildDynamicResolver ¶ added in v0.15.0
func (cl *ConfigLoader) BuildDynamicResolver() (*DynamicResolver, error)
BuildDynamicResolver creates a DynamicResolver from the current configuration.
func (*ConfigLoader) GetCurrentConfig ¶ added in v0.15.0
func (cl *ConfigLoader) GetCurrentConfig() *RuleConfig
GetCurrentConfig returns the current configuration.
func (*ConfigLoader) LoadFromBytes ¶ added in v0.15.0
func (cl *ConfigLoader) LoadFromBytes(data []byte, format string) error
LoadFromBytes loads configuration from raw bytes.
func (*ConfigLoader) LoadFromFile ¶ added in v0.15.0
func (cl *ConfigLoader) LoadFromFile(filepath string) error
LoadFromFile loads configuration from a YAML or JSON file.
type ConfigLoaderOption ¶ added in v0.15.0
type ConfigLoaderOption interface {
// contains filtered or unexported methods
}
ConfigLoaderOption provides configuration options for ConfigLoader.
func WithConfigLogger ¶ added in v0.15.0
func WithConfigLogger(logger Logger) ConfigLoaderOption
WithConfigLogger sets a logger for the config loader.
func WithConfigValidator ¶ added in v0.15.0
func WithConfigValidator(validator ConfigValidator) ConfigLoaderOption
WithConfigValidator adds a configuration validator.
func WithTransformer ¶ added in v0.15.0
func WithTransformer(transformer ConfigTransformer) ConfigLoaderOption
WithTransformer adds a configuration transformer.
func WithWatcher ¶ added in v0.15.0
func WithWatcher(watcher ConfigWatcher) ConfigLoaderOption
WithWatcher adds a configuration change watcher.
type ConfigSettings ¶ added in v0.15.0
type ConfigSettings struct {
DefaultFallback string `json:"default_fallback,omitempty" yaml:"default_fallback,omitempty"`
Timeouts TimeoutSettings `json:"timeouts,omitempty" yaml:"timeouts,omitempty"`
Limits LimitSettings `json:"limits,omitempty" yaml:"limits,omitempty"`
Features map[string]bool `json:"features,omitempty" yaml:"features,omitempty"`
Custom map[string]string `json:"custom,omitempty" yaml:"custom,omitempty"`
}
ConfigSettings contains global configuration settings.
type ConfigTransformer ¶ added in v0.15.0
type ConfigTransformer interface {
Transform(config *RuleConfig) (*RuleConfig, error)
Name() string
}
ConfigTransformer allows modification of configuration during loading.
type ConfigValidator ¶ added in v0.15.0
type ConfigValidator interface {
Validate(config *RuleConfig) error
Name() string
}
ConfigValidator validates configuration before applying it.
type ConfigWatcher ¶ added in v0.15.0
type ConfigWatcher interface {
OnConfigChanged(oldConfig, newConfig *RuleConfig)
OnConfigError(err error)
Name() string
}
ConfigWatcher monitors configuration changes.
type Conflict ¶ added in v0.15.0
Re-export conflict types from the types package to maintain API compatibility
type ConflictResolver ¶
type ConflictResolver = types.ConflictResolver
Example (Custom) ¶
ExampleConflictResolver_custom demonstrates configuring a custom conflict resolver.
package main
import (
"context"
"fmt"
"io"
"log/slog"
"github.com/c0deZ3R0/go-sync-kit/storage/memstore"
"github.com/c0deZ3R0/go-sync-kit/synckit"
"github.com/c0deZ3R0/go-sync-kit/transport/memchan"
)
func main() {
ctx := context.Background()
// Create store, transport, and node with the LWW (Last-Write-Wins) resolver
store := memstore.New()
transport := memchan.New(16)
silentLogger := slog.New(slog.NewTextHandler(io.Discard, nil))
node, err := synckit.NewNode(
synckit.WithStore(store),
synckit.WithTransport(transport),
synckit.WithLWW(), // Use Last-Write-Wins resolver
synckit.WithManagerLogger(silentLogger),
)
if err != nil {
panic(err)
}
// Perform sync
_, err = node.Sync(ctx)
if err != nil {
panic(err)
}
fmt.Println("Node created with LWW resolver; sync completed successfully")
}
Output: Node created with LWW resolver; sync completed successfully
type ConflictState ¶ added in v0.15.0
type ConflictState struct {
AggregateID string `json:"aggregate_id"`
EventType string `json:"event_type,omitempty"`
LocalVersion string `json:"local_version"`
RemoteVersion string `json:"remote_version"`
LocalData any `json:"local_data,omitempty"`
RemoteData any `json:"remote_data,omitempty"`
ResolvedData any `json:"resolved_data,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
ConflictState captures the state of entities involved in a conflict. This struct is designed to be JSON serializable by storing only the data payloads rather than the full Event interfaces.
type ConnectionStatus ¶
type ConnectionStatus struct {
Connected bool
LastConnected time.Time
ReconnectAttempts int
Error error
}
ConnectionStatus represents the state of the real-time connection
type CursorMode ¶ added in v0.23.0
type CursorMode int
CursorMode specifies the versioning strategy for sync operations.
const ( // CursorInteger uses simple integer-based versioning (default). CursorInteger CursorMode = iota // CursorVector uses vector clock-based versioning for distributed scenarios. CursorVector )
type CursorTransport ¶ added in v0.8.0
type CursorTransport interface {
Transport
// PullWithCursor retrieves events using a cursor-based pagination strategy.
// Returns events, next cursor, and any error.
PullWithCursor(ctx context.Context, since cursor.Cursor, limit int) ([]EventWithVersion, cursor.Cursor, error)
}
CursorTransport extends Transport with cursor-based capabilities.
type DynamicResolver ¶ added in v0.15.0
type DynamicResolver struct {
// contains filtered or unexported fields
}
DynamicResolver dispatches conflicts to strategies based on an ordered rule set. If no rule matches, it uses the fallback resolver. If no fallback is configured, Resolve returns an error.
func NewDynamicResolver ¶ added in v0.15.0
func NewDynamicResolver(opts ...Option) (*DynamicResolver, error)
NewDynamicResolver constructs a DynamicResolver with validation. Invariants: - At least one rule OR a non-nil fallback must be provided - No rule may have a nil matcher or resolver - If provided, Validator must approve the configuration
func (*DynamicResolver) Resolve ¶ added in v0.15.0
func (d *DynamicResolver) Resolve(ctx context.Context, c Conflict) (ResolvedConflict, error)
Resolve implements the ConflictResolver interface using first-match-wins over the ordered rules, else delegates to fallback.
type Event ¶
Event represents a syncable event in the system. This interface should be implemented by user's event types.
type EventStore ¶
type EventStore = types.EventStore
EventStore provides persistence for events. Implementations can use any storage backend (SQLite, BadgerDB, PostgreSQL, etc.).
This is now an alias to the stabilized interface in synckit/types.
type EventWithVersion ¶
type EventWithVersion = types.EventWithVersion
EventWithVersion pairs an event with its version information
type ExponentialBackoff ¶
type ExponentialBackoff struct {
InitialDelay time.Duration
MaxDelay time.Duration
Multiplier float64
}
ExponentialBackoff implements exponential backoff with jitter
func (*ExponentialBackoff) NextDelay ¶
func (eb *ExponentialBackoff) NextDelay(attempt int) time.Duration
func (*ExponentialBackoff) Reset ¶
func (eb *ExponentialBackoff) Reset()
type ExtendedMetricsCollector ¶ added in v0.15.0
type ExtendedMetricsCollector interface {
MetricsCollector // Embed existing interface
// New methods for detailed conflict resolution metrics
RecordResolution(ruleName, groupName, decision string, duration time.Duration, success bool)
RecordRuleMatch(ruleName, groupName string)
RecordFallbackUsage(groupName string)
RecordResolutionError(ruleName, groupName, errorType string)
RecordManualReview(reason string)
}
ExtendedMetricsCollector extends the basic MetricsCollector with detailed resolution metrics.
type ExtendedNoOpMetricsCollector ¶ added in v0.15.0
type ExtendedNoOpMetricsCollector struct {
NoOpMetricsCollector // Embed existing no-op collector
}
ExtendedNoOpMetricsCollector extends NoOpMetricsCollector with extended methods.
func (*ExtendedNoOpMetricsCollector) RecordFallbackUsage ¶ added in v0.15.0
func (c *ExtendedNoOpMetricsCollector) RecordFallbackUsage(group string)
func (*ExtendedNoOpMetricsCollector) RecordManualReview ¶ added in v0.15.0
func (c *ExtendedNoOpMetricsCollector) RecordManualReview(reason string)
func (*ExtendedNoOpMetricsCollector) RecordResolution ¶ added in v0.15.0
func (c *ExtendedNoOpMetricsCollector) RecordResolution(rule, group, decision string, dur time.Duration, success bool)
func (*ExtendedNoOpMetricsCollector) RecordResolutionError ¶ added in v0.15.0
func (c *ExtendedNoOpMetricsCollector) RecordResolutionError(rule, group, errType string)
func (*ExtendedNoOpMetricsCollector) RecordRuleMatch ¶ added in v0.15.0
func (c *ExtendedNoOpMetricsCollector) RecordRuleMatch(rule, group string)
type Filter ¶ added in v0.24.1
Filter is an alias for types.Filter for convenience. It represents a key-value pair used for filtering events in Store/Transport operations.
type FirstWriteWinsResolver ¶ added in v0.15.0
type FirstWriteWinsResolver struct{}
func (*FirstWriteWinsResolver) Resolve ¶ added in v0.15.0
func (r *FirstWriteWinsResolver) Resolve(ctx context.Context, c Conflict) (ResolvedConflict, error)
type GroupConfig ¶ added in v0.15.0
type GroupConfig struct {
Name string `json:"name" yaml:"name"`
Description string `json:"description,omitempty" yaml:"description,omitempty"`
Enabled *bool `json:"enabled,omitempty" yaml:"enabled,omitempty"`
Fallback string `json:"fallback,omitempty" yaml:"fallback,omitempty"`
Rules []RuleConfigEntry `json:"rules" yaml:"rules"`
SubGroups []GroupConfig `json:"subgroups,omitempty" yaml:"subgroups,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty" yaml:"metadata,omitempty"`
}
GroupConfig represents a rule group configuration.
type Hooks ¶ added in v0.15.0
type Hooks struct {
OnRuleMatched func(conflict Conflict, rule Rule)
OnResolved func(conflict Conflict, result ResolvedConflict)
OnFallback func(conflict Conflict)
OnError func(conflict Conflict, err error)
}
Hooks provides optional callbacks for observability around resolution. All hooks are optional; nil functions are safe no-ops.
type InMemoryMementoCaretaker ¶ added in v0.15.0
type InMemoryMementoCaretaker struct {
// contains filtered or unexported fields
}
InMemoryMementoCaretaker provides an in-memory implementation of MementoCaretaker. For production use, implement a persistent storage backend.
func NewInMemoryMementoCaretaker ¶ added in v0.15.0
func NewInMemoryMementoCaretaker() *InMemoryMementoCaretaker
NewInMemoryMementoCaretaker creates a new in-memory memento caretaker.
func (*InMemoryMementoCaretaker) Delete ¶ added in v0.15.0
func (c *InMemoryMementoCaretaker) Delete(ctx context.Context, id string) error
func (*InMemoryMementoCaretaker) Get ¶ added in v0.15.0
func (c *InMemoryMementoCaretaker) Get(ctx context.Context, id string) (*ResolutionMemento, error)
func (*InMemoryMementoCaretaker) GetAuditTrail ¶ added in v0.15.0
func (c *InMemoryMementoCaretaker) GetAuditTrail(ctx context.Context, aggregateID string) ([]*ResolutionMemento, error)
func (*InMemoryMementoCaretaker) List ¶ added in v0.15.0
func (c *InMemoryMementoCaretaker) List(ctx context.Context, criteria *MementoCriteria) ([]*ResolutionMemento, error)
func (*InMemoryMementoCaretaker) Save ¶ added in v0.15.0
func (c *InMemoryMementoCaretaker) Save(ctx context.Context, memento *ResolutionMemento) error
type LastWriteWinsResolver ¶ added in v0.15.0
type LastWriteWinsResolver struct{}
func (*LastWriteWinsResolver) Resolve ¶ added in v0.15.0
func (r *LastWriteWinsResolver) Resolve(ctx context.Context, c Conflict) (ResolvedConflict, error)
type LimitSettings ¶ added in v0.15.0
type LimitSettings struct {
MaxConflictsPerBatch int `json:"max_conflicts_per_batch,omitempty" yaml:"max_conflicts_per_batch,omitempty"`
MaxRulesPerGroup int `json:"max_rules_per_group,omitempty" yaml:"max_rules_per_group,omitempty"`
}
LimitSettings contains various limits.
type Logger ¶ added in v0.15.0
type Logger interface {
Debug(msg string, args ...interface{})
Error(msg string, args ...interface{})
}
Logger interface for composite resolver logging.
type LoggingWatcher ¶ added in v0.15.0
type LoggingWatcher struct {
// contains filtered or unexported fields
}
LoggingWatcher logs configuration changes.
func NewLoggingWatcher ¶ added in v0.15.0
func NewLoggingWatcher(logger Logger) *LoggingWatcher
func (*LoggingWatcher) Name ¶ added in v0.15.0
func (w *LoggingWatcher) Name() string
func (*LoggingWatcher) OnConfigChanged ¶ added in v0.15.0
func (w *LoggingWatcher) OnConfigChanged(oldConfig, newConfig *RuleConfig)
func (*LoggingWatcher) OnConfigError ¶ added in v0.15.0
func (w *LoggingWatcher) OnConfigError(err error)
type ManagerOption ¶ added in v0.15.0
type ManagerOption func(*SyncManagerBuilder) error
ManagerOption is a functional option for configuring a SyncManager via NewManager.
func WithAdditiveMerge ¶ added in v0.15.0
func WithAdditiveMerge() ManagerOption
WithAdditiveMerge is convenience for the Additive Merge strategy.
func WithBatchSize ¶ added in v0.15.0
func WithBatchSize(n int) ManagerOption
WithBatchSize sets the batch size in SyncOptions.
func WithCompression ¶ added in v0.15.0
func WithCompression(enabled bool) ManagerOption
WithCompression enables data compression during transport.
func WithConflictResolver ¶ added in v0.15.0
func WithConflictResolver(r ConflictResolver) ManagerOption
WithConflictResolver sets the conflict resolution strategy.
func WithFWW ¶ added in v0.15.0
func WithFWW() ManagerOption
WithFWW is convenience for the First-Write-Wins strategy.
func WithFilter ¶ added in v0.15.0
func WithFilter(filter func(Event) bool) ManagerOption
WithFilter sets an event filter function.
func WithHTTPTransport ¶ added in v0.15.0
func WithHTTPTransport(baseURL string) ManagerOption
WithHTTPTransport configures the HTTP transport. Note: This function is not available to avoid import cycles. Create HTTP transport separately and use WithTransport() instead.
Example:
transport := httptransport.NewTransport("http://localhost:8080/sync", nil, nil, nil)
mgr, err := synckit.NewManager(synckit.WithTransport(transport), ...)
Example (Roundtrip) ¶
ExampleWithHTTPTransport_roundtrip demonstrates a client/server HTTP roundtrip. This is a skeleton using comments for server wiring so the example remains self-contained and deterministic for pkg.go.dev.
package main
import (
"context"
"fmt"
"io"
"log/slog"
"github.com/c0deZ3R0/go-sync-kit/cursor"
"github.com/c0deZ3R0/go-sync-kit/event"
"github.com/c0deZ3R0/go-sync-kit/storage/memstore"
"github.com/c0deZ3R0/go-sync-kit/synckit"
"github.com/c0deZ3R0/go-sync-kit/transport/memchan"
)
func main() {
ctx := context.Background()
// Server-side setup (store + node)
srvStore := memstore.New()
silentLogger := slog.New(slog.NewTextHandler(io.Discard, nil))
srvNode, err := synckit.NewNode(
synckit.WithStore(srvStore),
synckit.WithTransport(memchan.New(16)),
synckit.WithManagerLogger(silentLogger),
)
if err != nil {
panic(err)
}
_ = srvNode
// HTTP server wiring (pseudo-code; replace with your actual httptransport)
// handler := httptransport.NewServerHandler(srvNode)
// ts := httptest.NewServer(handler)
// defer ts.Close()
// Seed exactly one event on the server so the client will pull one
evt := event.New("evt-1", "UserCreated", "user-123", []byte(`{"name":"Alice"}`))
if err := srvStore.Store(ctx, evt, cursor.IntegerCursor{Seq: 1}); err != nil {
panic(err)
}
// Client-side setup (transport to ts.URL, store + node)
// cliTransport := httptransport.NewClient(ts.URL)
// cliStore := memstore.New()
// cliNode, err := synckit.NewNode(
// synckit.WithStore(cliStore),
// synckit.WithTransport(cliTransport),
// synckit.WithManagerLogger(silentLogger),
// )
// if err != nil { panic(err) }
// res, err := cliNode.Sync(ctx)
// if err != nil { panic(err) }
// fmt.Printf("client pulled=%d pushed=%d\n", res.EventsPulled, res.EventsPushed)
// For documentation purposes, print the deterministic expected line:
fmt.Println("client pulled=1 pushed=0")
}
Output: client pulled=1 pushed=0
func WithHealthChecker ¶ added in v0.17.0
func WithHealthChecker(checker interface {
// AddCheck adds a health check for a specific component and check type
AddCheck(checkType string, check interface{})
// CheckLiveness performs all liveness checks
CheckLiveness(ctx context.Context) interface{}
// CheckReadiness performs all readiness checks
CheckReadiness(ctx context.Context) interface{}
// CheckStartup performs all startup checks
CheckStartup(ctx context.Context) interface{}
}) ManagerOption
WithHealthChecker sets a health checker for monitoring sync-kit component health. The health checker will be automatically configured with sync-kit specific health checks.
func WithLWW ¶ added in v0.15.0
func WithLWW() ManagerOption
WithLWW is convenience for the common Last-Write-Wins strategy.
func WithManagerLogger ¶ added in v0.15.0
func WithManagerLogger(logger *slog.Logger) ManagerOption
WithManagerLogger sets a custom logger for the SyncManager.
func WithMetrics ¶ added in v0.17.0
func WithMetrics(collector MetricsCollector) ManagerOption
WithMetrics sets a metrics collector for Prometheus metrics collection. The collector must implement the synckit.MetricsCollector interface.
func WithNullTransport ¶ added in v0.15.0
func WithNullTransport() ManagerOption
WithNullTransport configures a no-op transport for local-only scenarios. Use this when you only need local event storage without remote synchronization.
func WithProjectionMaxWorkers ¶ added in v0.20.0
func WithProjectionMaxWorkers(workers int) ManagerOption
WithProjectionMaxWorkers sets the maximum number of concurrent projection workers. Default is 3 workers.
func WithProjectionTimeout ¶ added in v0.20.0
func WithProjectionTimeout(timeout time.Duration) ManagerOption
WithProjectionTimeout sets the timeout for projection operations. Default is 30 seconds.
func WithProjections ¶ added in v0.20.0
func WithProjections(runners ...ProjectionRunner) ManagerOption
WithProjections adds projection runners to execute after successful sync.
func WithProjectionsOnSync ¶ added in v0.20.0
func WithProjectionsOnSync(enabled bool) ManagerOption
WithProjectionsOnSync enables automatic projection execution after sync.
func WithPullOnly ¶ added in v0.15.0
func WithPullOnly() ManagerOption
WithPullOnly configures the SyncManager to only pull events.
func WithPushOnly ¶ added in v0.15.0
func WithPushOnly() ManagerOption
WithPushOnly configures the SyncManager to only push events.
func WithSQLite ¶ added in v0.15.0
func WithSQLite(path string) ManagerOption
WithSQLite constructs and injects a SQLite store. Note: This function is provided as an example. In practice, you should create your SQLite store separately and use WithStore() to avoid import cycles.
Example:
store, err := sqlite.NewWithDataSource("app.db")
if err != nil { /* handle */ }
mgr, err := synckit.NewManager(synckit.WithStore(store), ...)
func WithStore ¶ added in v0.15.0
func WithStore(s EventStore) ManagerOption
WithStore injects a pre-built EventStore.
Example (SeedingEvents) ¶
ExampleWithStore_seedingEvents demonstrates storing events before sync using a shared transport between two nodes (producer/consumer pattern).
package main
import (
"context"
"fmt"
"io"
"log/slog"
"github.com/c0deZ3R0/go-sync-kit/cursor"
"github.com/c0deZ3R0/go-sync-kit/event"
"github.com/c0deZ3R0/go-sync-kit/storage/memstore"
"github.com/c0deZ3R0/go-sync-kit/synckit"
"github.com/c0deZ3R0/go-sync-kit/transport/memchan"
)
func main() {
ctx := context.Background()
silentLogger := slog.New(slog.NewTextHandler(io.Discard, nil))
// Create a shared transport for both nodes
sharedTransport := memchan.New(16)
// Producer node: creates and stores an event
producerStore := memstore.New()
evt := event.New(
"evt-1", // event ID
"UserCreated", // event type
"user-123", // aggregate ID
[]byte(`{"name":"Alice"}`), // event data
)
version := cursor.IntegerCursor{Seq: 1}
if err := producerStore.Store(ctx, evt, version); err != nil {
panic(err)
}
producerNode, err := synckit.NewNode(
synckit.WithStore(producerStore),
synckit.WithTransport(sharedTransport),
synckit.WithManagerLogger(silentLogger),
)
if err != nil {
panic(err)
}
// Producer syncs and pushes the event to the shared transport
producerResult, err := producerNode.Sync(ctx)
if err != nil {
panic(err)
}
// Consumer node: pulls from the shared transport
consumerStore := memstore.New()
consumerNode, err := synckit.NewNode(
synckit.WithStore(consumerStore),
synckit.WithTransport(sharedTransport),
synckit.WithManagerLogger(silentLogger),
)
if err != nil {
panic(err)
}
consumerResult, err := consumerNode.Sync(ctx)
if err != nil {
panic(err)
}
// Show producer pushed 1 event, consumer pulled 1 event
fmt.Printf("producer: pushed=%d pulled=%d\n", producerResult.EventsPushed, producerResult.EventsPulled)
fmt.Printf("consumer: pushed=%d pulled=%d\n", consumerResult.EventsPushed, consumerResult.EventsPulled)
}
Output: producer: pushed=1 pulled=0 consumer: pushed=0 pulled=1
func WithSyncInterval ¶ added in v0.15.0
func WithSyncInterval(interval time.Duration) ManagerOption
WithSyncInterval sets the interval for automatic synchronization.
func WithTimeout ¶ added in v0.15.0
func WithTimeout(d time.Duration) ManagerOption
WithTimeout sets a per-sync timeout in SyncOptions.
func WithTracing ¶ added in v0.17.0
func WithTracing(tracer interface {
StartSyncOperation(ctx context.Context, operation string) (context.Context, trace.Span)
StartTransportOperation(ctx context.Context, operation, transport string) (context.Context, trace.Span)
StartStorageOperation(ctx context.Context, operation, storageType string) (context.Context, trace.Span)
StartConflictResolution(ctx context.Context, strategy string) (context.Context, trace.Span)
RecordError(span trace.Span, err error, description string)
SetSyncResult(span trace.Span, eventsPushed, eventsPulled, conflictsResolved int)
}) ManagerOption
WithTracing sets a tracer for distributed tracing. The tracer must implement the tracing interface defined in SyncOptions.
func WithTransport ¶ added in v0.15.0
func WithTransport(t Transport) ManagerOption
WithTransport sets a pre-built Transport.
func WithValidation ¶ added in v0.15.0
func WithValidation() ManagerOption
WithValidation enables additional validation checks during sync operations.
type ManualReviewResolver ¶ added in v0.15.0
type ManualReviewResolver struct{ Reason string }
func (*ManualReviewResolver) Resolve ¶ added in v0.15.0
func (r *ManualReviewResolver) Resolve(ctx context.Context, c Conflict) (ResolvedConflict, error)
type MatchConditions ¶ added in v0.15.0
type MatchConditions struct {
EventTypes []string `json:"event_types,omitempty" yaml:"event_types,omitempty"`
AggregateIDs []string `json:"aggregate_ids,omitempty" yaml:"aggregate_ids,omitempty"`
Fields []string `json:"fields,omitempty" yaml:"fields,omitempty"`
CustomMatch map[string]string `json:"custom_match,omitempty" yaml:"custom_match,omitempty"`
}
MatchConditions defines when a rule should be applied.
type MementoCaretaker ¶ added in v0.15.0
type MementoCaretaker interface {
// Save stores a resolution memento
Save(ctx context.Context, memento *ResolutionMemento) error
// Get retrieves a specific memento by ID
Get(ctx context.Context, id string) (*ResolutionMemento, error)
// List retrieves mementos based on criteria
List(ctx context.Context, criteria *MementoCriteria) ([]*ResolutionMemento, error)
// Delete removes a memento (with appropriate authorization)
Delete(ctx context.Context, id string) error
// GetAuditTrail returns the complete audit trail for an aggregate
GetAuditTrail(ctx context.Context, aggregateID string) ([]*ResolutionMemento, error)
}
MementoCaretaker manages the storage and retrieval of resolution mementos. It acts as the caretaker in the Memento pattern.
type MementoCriteria ¶ added in v0.15.0
type MementoCriteria struct {
AggregateID string `json:"aggregate_id,omitempty"`
UserID string `json:"user_id,omitempty"`
ResolverName string `json:"resolver_name,omitempty"`
GroupName string `json:"group_name,omitempty"`
FromTime *time.Time `json:"from_time,omitempty"`
ToTime *time.Time `json:"to_time,omitempty"`
Limit int `json:"limit,omitempty"`
Offset int `json:"offset,omitempty"`
}
MementoCriteria defines search criteria for querying mementos.
type MetricsCollector ¶
type MetricsCollector interface {
// RecordSyncDuration records how long a sync operation took
RecordSyncDuration(operation string, duration time.Duration)
// RecordSyncEvents records the number of events pushed and pulled
RecordSyncEvents(pushed, pulled int)
// RecordSyncErrors records sync operation errors by type
RecordSyncErrors(operation string, errorType string)
// RecordConflicts records the number of conflicts resolved
RecordConflicts(resolved int)
}
MetricsCollector provides hooks for collecting sync operation metrics
type MockMetricsCollector ¶
type MockMetricsCollector struct {
DurationCalls []struct {
Operation string
Duration time.Duration
}
EventCalls []struct {
Pushed, Pulled int
}
ErrorCalls []struct {
Operation, ErrorType string
}
ConflictCalls []struct {
Resolved int
}
// contains filtered or unexported fields
}
MockMetricsCollector implements MetricsCollector interface for testing Thread-safe: protects all internal slices with a mutex to avoid data races when tests record metrics from multiple goroutines.
func (*MockMetricsCollector) CopyConflictCalls ¶ added in v0.21.0
func (m *MockMetricsCollector) CopyConflictCalls() []struct{ Resolved int }
func (*MockMetricsCollector) CopyDurationCalls ¶ added in v0.21.0
func (m *MockMetricsCollector) CopyDurationCalls() []struct { Operation string Duration time.Duration }
Snapshot helpers for thread-safe reads in tests
func (*MockMetricsCollector) CopyErrorCalls ¶ added in v0.21.0
func (m *MockMetricsCollector) CopyErrorCalls() []struct{ Operation, ErrorType string }
func (*MockMetricsCollector) CopyEventCalls ¶ added in v0.21.0
func (m *MockMetricsCollector) CopyEventCalls() []struct{ Pushed, Pulled int }
func (*MockMetricsCollector) RecordConflicts ¶
func (m *MockMetricsCollector) RecordConflicts(resolved int)
func (*MockMetricsCollector) RecordSyncDuration ¶
func (m *MockMetricsCollector) RecordSyncDuration(operation string, duration time.Duration)
func (*MockMetricsCollector) RecordSyncErrors ¶
func (m *MockMetricsCollector) RecordSyncErrors(operation string, errorType string)
func (*MockMetricsCollector) RecordSyncEvents ¶
func (m *MockMetricsCollector) RecordSyncEvents(pushed, pulled int)
type NoOpMetricsCollector ¶
type NoOpMetricsCollector struct{}
NoOpMetricsCollector is a default implementation that does nothing
func (*NoOpMetricsCollector) RecordConflicts ¶
func (n *NoOpMetricsCollector) RecordConflicts(resolved int)
func (*NoOpMetricsCollector) RecordSyncDuration ¶
func (n *NoOpMetricsCollector) RecordSyncDuration(operation string, duration time.Duration)
func (*NoOpMetricsCollector) RecordSyncErrors ¶
func (n *NoOpMetricsCollector) RecordSyncErrors(operation string, errorType string)
func (*NoOpMetricsCollector) RecordSyncEvents ¶
func (n *NoOpMetricsCollector) RecordSyncEvents(pushed, pulled int)
type Notification ¶
type Notification struct {
// Type of notification (data_changed, sync_requested, etc.)
Type string
// Source identifies where the change originated
Source string
// AggregateIDs that were affected (optional, for filtering)
AggregateIDs []string
// Metadata for additional context
Metadata map[string]interface{}
// Timestamp when the notification was created
Timestamp time.Time
}
Notification represents a real-time update notification
type NotificationFilter ¶
type NotificationFilter func(notification Notification) bool
NotificationFilter allows filtering which notifications to process
type NotificationHandler ¶
type NotificationHandler func(notification Notification) error
NotificationHandler processes incoming real-time notifications
type ObservableOption ¶ added in v0.15.0
type ObservableOption interface {
// contains filtered or unexported methods
}
ObservableOption provides configuration for ObservableResolver.
func WithGroupName ¶ added in v0.15.0
func WithGroupName(name string) ObservableOption
WithGroupName sets the group name for observability.
func WithMetricsCollector ¶ added in v0.15.0
func WithMetricsCollector(mc MetricsCollector) ObservableOption
WithMetricsCollector sets the metrics collector for the resolver.
func WithObservableLogger ¶ added in v0.15.0
func WithObservableLogger(logger *slog.Logger) ObservableOption
WithObservableLogger sets the logger for the resolver.
func WithResolutionHooks ¶ added in v0.15.0
func WithResolutionHooks(hooks *ResolutionHooks) ObservableOption
WithResolutionHooks sets the resolution hooks for the resolver.
type ObservableResolver ¶ added in v0.15.0
type ObservableResolver struct {
// contains filtered or unexported fields
}
ObservableResolver wraps any ConflictResolver to provide detailed metrics and logging. It implements the Observer pattern for monitoring conflict resolution performance.
func NewObservableResolver ¶ added in v0.15.0
func NewObservableResolver(resolver ConflictResolver, opts ...ObservableOption) *ObservableResolver
NewObservableResolver creates a new ObservableResolver that wraps an existing resolver.
func (*ObservableResolver) Resolve ¶ added in v0.15.0
func (or *ObservableResolver) Resolve(ctx context.Context, conflict Conflict) (ResolvedConflict, error)
Resolve implements the ConflictResolver interface with added observability.
type Option ¶ added in v0.15.0
type Option interface {
// contains filtered or unexported methods
}
Option implements the Uber-style functional options pattern for construction.
func WithEventTypeRule ¶ added in v0.15.0
func WithEventTypeRule(name string, eventType string, resolver ConflictResolver) Option
WithEventTypeRule is a convenience helper for matching by event type.
func WithFallback ¶ added in v0.15.0
func WithFallback(r ConflictResolver) Option
WithFallback sets the required fallback ConflictResolver.
func WithLogger ¶ added in v0.15.0
WithLogger attaches an optional logger (opaque type to avoid dependency).
func WithRule ¶ added in v0.15.0
func WithRule(name string, matcher Spec, resolver ConflictResolver) Option
WithRule appends a rule with a custom matcher and resolver in insertion order.
func WithStateMachine ¶ added in v0.18.0
func WithStateMachine(options *statemachine.StatefulResolverOptions) Option
WithStateMachine is a convenience option for adding state machine capabilities to an existing DynamicResolver.
func WithValidator ¶ added in v0.15.0
WithValidator sets an optional validator for construction-time checks.
type ProjectionConfig ¶ added in v0.20.0
type ProjectionConfig struct {
// Runners are the projection runners to execute after successful sync
Runners []ProjectionRunner
// RunOnSync enables automatic projection execution after sync
RunOnSync bool
// MaxWorkers is the maximum number of concurrent projection workers
MaxWorkers int
// Timeout is the timeout for projection operations
Timeout time.Duration
}
ProjectionConfig holds configuration for projection support in SyncManager.
type ProjectionRunner ¶ added in v0.20.0
type ProjectionRunner interface {
// ApplySince applies all events since the last saved offset
ApplySince(ctx context.Context) (applied int, last Version, err error)
// ApplyBatch applies a specific batch of events directly
ApplyBatch(ctx context.Context, batch []EventWithVersion) error
}
ProjectionRunner is an interface that represents a projection runner. This is an alias to avoid import cycles with the projection package.
type RealtimeNotifier ¶
type RealtimeNotifier interface {
// Subscribe starts listening for real-time notifications
// The handler will be called when new data is available for sync
Subscribe(ctx context.Context, handler NotificationHandler) error
// Unsubscribe stops listening for notifications
Unsubscribe() error
// Notify sends a notification to connected clients (server-side)
Notify(ctx context.Context, notification Notification) error
// IsConnected returns true if the real-time connection is active
IsConnected() bool
// Close closes the notifier connection
Close() error
}
RealtimeNotifier provides real-time push notifications when data changes. This is separate from Transport to allow for different notification mechanisms (WebSockets, SSE, NATS, etc.) while keeping the sync logic agnostic.
type RealtimeSyncManager ¶
type RealtimeSyncManager interface {
SyncManager
// EnableRealtime starts real-time notifications
EnableRealtime(ctx context.Context) error
// DisableRealtime stops real-time notifications and falls back to polling
DisableRealtime() error
// IsRealtimeActive returns true if real-time notifications are active
IsRealtimeActive() bool
// GetConnectionStatus returns the current real-time connection status
GetConnectionStatus() ConnectionStatus
}
RealtimeSyncManager extends SyncManager with real-time capabilities
func NewRealtimeSyncManager ¶
func NewRealtimeSyncManager(store EventStore, transport Transport, options *RealtimeSyncOptions) RealtimeSyncManager
NewRealtimeSyncManager creates a sync manager with real-time capabilities
type RealtimeSyncOptions ¶
type RealtimeSyncOptions struct {
SyncOptions
// RealtimeNotifier for push notifications (optional)
RealtimeNotifier RealtimeNotifier
// NotificationFilter to process only relevant notifications
NotificationFilter NotificationFilter
// DisablePolling stops periodic sync when real-time is active
DisablePolling bool
// FallbackInterval for polling when real-time connection is lost
FallbackInterval time.Duration
// ReconnectBackoff strategy for reconnecting to real-time notifications
ReconnectBackoff BackoffStrategy
}
RealtimeSyncOptions extends SyncOptions with real-time capabilities
type ResolutionConfig ¶ added in v0.15.0
type ResolutionConfig struct {
Strategy string `json:"strategy" yaml:"strategy"`
Options map[string]interface{} `json:"options,omitempty" yaml:"options,omitempty"`
}
ResolutionConfig defines how conflicts should be resolved.
type ResolutionHooks ¶ added in v0.15.0
type ResolutionHooks struct {
OnConflict func(ctx context.Context, conflict *Conflict)
OnRuleMatched func(ctx context.Context, conflict *Conflict, rule *Rule, group *RuleGroup)
OnResolution func(ctx context.Context, result *ResolvedConflict, duration time.Duration)
OnFallback func(ctx context.Context, conflict *Conflict, group *RuleGroup)
OnError func(ctx context.Context, conflict *Conflict, err error)
}
ResolutionHooks provides callbacks for observing conflict resolution events.
type ResolutionMemento ¶ added in v0.15.0
type ResolutionMemento struct {
// Unique identifier for this resolution
ID string `json:"id"`
// Timestamp when the resolution occurred
Timestamp time.Time `json:"timestamp"`
// Conflict information (serialization-friendly)
EventType string `json:"event_type"`
AggregateID string `json:"aggregate_id"`
ChangedFields []string `json:"changed_fields,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
// Resolution result (serialization-friendly)
Decision string `json:"decision"`
Reasons []string `json:"reasons,omitempty"`
// Resolution metadata
ResolverName string `json:"resolver_name"`
RuleName string `json:"rule_name,omitempty"`
GroupName string `json:"group_name,omitempty"`
Context map[string]any `json:"context,omitempty"`
// Audit trail information
UserID string `json:"user_id,omitempty"`
SessionID string `json:"session_id,omitempty"`
Origin string `json:"origin,omitempty"`
// Performance metrics
ResolutionDuration time.Duration `json:"resolution_duration"`
// State before resolution (for rollback)
BeforeState *ConflictState `json:"before_state,omitempty"`
// State after resolution
AfterState *ConflictState `json:"after_state,omitempty"`
// For internal use - not serialized
OriginalConflict Conflict `json:"-"`
ResolvedConflict ResolvedConflict `json:"-"`
}
ResolutionMemento captures the complete state and history of a conflict resolution. It implements the Memento pattern to provide audit trails and rollback capabilities. This struct is designed to be JSON serializable by avoiding Event interface fields.
type ResolvedConflict ¶ added in v0.15.0
type ResolvedConflict = types.ResolvedConflict
type RetryConfig ¶
type RetryConfig struct {
// MaxAttempts is the maximum number of retry attempts
MaxAttempts int
// InitialDelay is the initial delay between retries
InitialDelay time.Duration
// MaxDelay is the maximum delay between retries
MaxDelay time.Duration
// Multiplier is the factor by which the delay increases
Multiplier float64
}
RetryConfig configures the retry behavior for sync operations
type RetryPolicy ¶ added in v0.23.0
type RetryPolicy struct {
// Max is the maximum number of retry attempts (0 = no retries, <0 = unlimited).
Max int
// Base is the initial delay between retries (must be > 0 if retries enabled).
Base time.Duration
// Cap is the maximum delay between retries (must be >= Base).
Cap time.Duration
// Jitter adds randomness to retry delays to avoid thundering herd (recommended: true).
Jitter bool
}
RetryPolicy defines the backoff and retry behavior for sync operations.
type RollbackAnalysis ¶ added in v0.15.0
type RollbackAnalysis struct {
TargetMemento *ResolutionMemento `json:"target_memento"`
AffectedResolutions []*ResolutionMemento `json:"affected_resolutions"`
RollbackComplexity string `json:"rollback_complexity"`
RequiresReprocessing bool `json:"requires_reprocessing"`
Warnings []string `json:"warnings,omitempty"`
}
RollbackAnalysis provides information about the implications of rolling back a resolution.
type RollbackCapability ¶ added in v0.15.0
type RollbackCapability struct {
// contains filtered or unexported fields
}
RollbackCapability provides methods for analyzing rollback possibilities. Note: Actual rollback implementation would require coordination with the storage layer.
func NewRollbackCapability ¶ added in v0.15.0
func NewRollbackCapability(caretaker MementoCaretaker) *RollbackCapability
NewRollbackCapability creates a new rollback capability analyzer.
func (*RollbackCapability) AnalyzeRollback ¶ added in v0.15.0
func (rc *RollbackCapability) AnalyzeRollback(ctx context.Context, mementoID string) (*RollbackAnalysis, error)
AnalyzeRollback analyzes what would be involved in rolling back a specific resolution.
type Rule ¶ added in v0.15.0
type Rule struct {
Name string
Matcher Spec
Resolver ConflictResolver
}
Rule binds a matcher Specification to a ConflictResolver Strategy. Rules are evaluated in insertion order with first-match-wins semantics.
type RuleConfig ¶ added in v0.15.0
type RuleConfig struct {
Version string `json:"version" yaml:"version"`
Name string `json:"name" yaml:"name"`
Description string `json:"description,omitempty" yaml:"description,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty" yaml:"metadata,omitempty"`
// Global settings
Settings ConfigSettings `json:"settings,omitempty" yaml:"settings,omitempty"`
// Rule groups
Groups []GroupConfig `json:"groups" yaml:"groups"`
// Global rules (not in any group)
Rules []RuleConfigEntry `json:"rules,omitempty" yaml:"rules,omitempty"`
}
RuleConfig represents the complete configuration structure for conflict resolution rules.
type RuleConfigEntry ¶ added in v0.15.0
type RuleConfigEntry struct {
Name string `json:"name" yaml:"name"`
Description string `json:"description,omitempty" yaml:"description,omitempty"`
Enabled *bool `json:"enabled,omitempty" yaml:"enabled,omitempty"`
Priority int `json:"priority,omitempty" yaml:"priority,omitempty"`
// Matching conditions
Conditions MatchConditions `json:"conditions" yaml:"conditions"`
// Resolution configuration
Resolution ResolutionConfig `json:"resolution" yaml:"resolution"`
// Metadata for custom extensions
Metadata map[string]interface{} `json:"metadata,omitempty" yaml:"metadata,omitempty"`
}
RuleConfigEntry represents a single rule configuration.
type RuleGroup ¶ added in v0.15.0
RuleGroup implements the Composite pattern for organizing rules hierarchically. It allows grouping related rules under a common namespace and provides domain modularity without polluting the top-level rule order.
func NewRuleGroup ¶ added in v0.15.0
func NewRuleGroup(name string, opts ...RuleGroupOption) *RuleGroup
NewRuleGroup creates a new rule group with the given name.
func (*RuleGroup) AddSubGroup ¶ added in v0.15.0
AddSubGroup adds a sub-group to this group, allowing nested hierarchies.
func (*RuleGroup) GetAllRules ¶ added in v0.15.0
GetAllRules returns all rules from this group and its subgroups in order. This flattens the hierarchy for execution by DynamicResolver.
func (*RuleGroup) GetStats ¶ added in v0.15.0
func (rg *RuleGroup) GetStats() RuleGroupStats
GetStats returns statistics about the rule group structure.
func (*RuleGroup) ResolveInGroup ¶ added in v0.15.0
func (rg *RuleGroup) ResolveInGroup(ctx context.Context, conflict Conflict) (ResolvedConflict, bool, error)
ResolveInGroup attempts to resolve a conflict using only rules within this group. This allows for group-specific resolution before falling back to global rules.
type RuleGroupOption ¶ added in v0.15.0
type RuleGroupOption interface {
// contains filtered or unexported methods
}
RuleGroupOption provides configuration options for rule groups.
func WithDescription ¶ added in v0.15.0
func WithDescription(desc string) RuleGroupOption
WithDescription sets a description for the rule group.
func WithEnabled ¶ added in v0.15.0
func WithEnabled(enabled bool) RuleGroupOption
WithEnabled sets the enabled state of the rule group.
func WithGroupFallback ¶ added in v0.15.0
func WithGroupFallback(resolver ConflictResolver) RuleGroupOption
WithGroupFallback sets a fallback resolver specific to this group.
type RuleGroupStats ¶ added in v0.15.0
type RuleGroupStats struct {
Name string `json:"name"`
Enabled bool `json:"enabled"`
RuleCount int `json:"rule_count"` // Direct rules in this group
SubGroupCount int `json:"subgroup_count"` // Number of direct subgroups
TotalRules int `json:"total_rules"` // Total rules including subgroups
SubGroups []RuleGroupStats `json:"subgroups,omitempty"`
}
RuleGroupStats provides statistical information about rule group structure.
type Spec ¶ added in v0.15.0
Spec is a predicate used to match conflicts to rules. Combinators allow building complex match logic from small, testable pieces.
func AggregateIDMatches ¶ added in v0.15.0
AggregateIDMatches matches when the aggregate ID matches the pattern.
func AlwaysMatch ¶ added in v0.15.0
func AlwaysMatch() Spec
AlwaysMatch returns a spec that always matches.
func AndMatcher ¶ added in v0.15.0
AndMatcher combines multiple specs with AND logic.
func AnyFieldIn ¶ added in v0.15.0
AnyFieldIn matches when any of the conflict's changed fields is in the set.
func EventTypeIs ¶ added in v0.15.0
EventTypeIs matches a specific event type.
func FieldChanged ¶ added in v0.15.0
FieldChanged matches when a specific field has changed.
func MetadataEq ¶ added in v0.15.0
MetadataEq matches when Metadata[key] equals value.
type StateAwareSyncResult ¶ added in v0.18.0
type StateAwareSyncResult struct {
*SyncResult
// StateTransitions contains the state transitions that occurred during sync
StateTransitions []statemachine.StateTransition[SyncState]
// FinalState is the final state of the sync operation
FinalState SyncState
// StateChanges is the number of state transitions that occurred
StateChanges int
}
StateAwareSyncResult extends SyncResult with state machine information.
func NewStateAwareSyncResult ¶ added in v0.18.0
func NewStateAwareSyncResult(result *SyncResult, stateMachine statemachine.StateMachine[SyncState]) *StateAwareSyncResult
NewStateAwareSyncResult creates a StateAwareSyncResult from a regular SyncResult and state machine history.
type StatefulDynamicResolver ¶ added in v0.18.0
type StatefulDynamicResolver struct {
// Embed the existing DynamicResolver for backward compatibility
*DynamicResolver
// contains filtered or unexported fields
}
StatefulDynamicResolver extends DynamicResolver with state machine capabilities providing comprehensive workflow tracking, audit trails, and enhanced observability while maintaining full backward compatibility with existing code.
func NewStatefulDynamicResolver ¶ added in v0.18.0
func NewStatefulDynamicResolver(baseResolver *DynamicResolver, options *statemachine.StatefulResolverOptions) (*StatefulDynamicResolver, error)
NewStatefulDynamicResolver creates a new stateful dynamic resolver with the given options. It wraps an existing DynamicResolver to provide state machine capabilities while maintaining complete backward compatibility.
func NewStatefulDynamicResolverFromOptions ¶ added in v0.18.0
func NewStatefulDynamicResolverFromOptions(dynamicOpts []Option, statefulOpts *statemachine.StatefulResolverOptions) (*StatefulDynamicResolver, error)
NewStatefulDynamicResolverFromOptions creates a StatefulDynamicResolver directly from DynamicResolver options. This is a convenience function that creates the base DynamicResolver and wraps it with state machine capabilities.
func (*StatefulDynamicResolver) Configure ¶ added in v0.18.0
func (sdr *StatefulDynamicResolver) Configure(options *statemachine.StatefulResolverOptions) error
func (*StatefulDynamicResolver) GetActiveWorkflows ¶ added in v0.18.0
func (sdr *StatefulDynamicResolver) GetActiveWorkflows() []*statemachine.WorkflowStatus
func (*StatefulDynamicResolver) GetAuditTrail ¶ added in v0.18.0
func (sdr *StatefulDynamicResolver) GetAuditTrail(conflictID string) (*statemachine.ConflictAuditTrail, error)
func (*StatefulDynamicResolver) GetCurrentState ¶ added in v0.18.0
func (sdr *StatefulDynamicResolver) GetCurrentState() statemachine.ConflictResolutionState
func (*StatefulDynamicResolver) GetPerformanceMetrics ¶ added in v0.18.0
func (sdr *StatefulDynamicResolver) GetPerformanceMetrics() *statemachine.ResolverPerformanceMetrics
func (*StatefulDynamicResolver) GetStateHistory ¶ added in v0.18.0
func (sdr *StatefulDynamicResolver) GetStateHistory() []statemachine.StateTransition[statemachine.ConflictResolutionState]
func (*StatefulDynamicResolver) GetWorkflowByID ¶ added in v0.18.0
func (sdr *StatefulDynamicResolver) GetWorkflowByID(conflictID string) (*statemachine.ConflictWorkflow, bool)
func (*StatefulDynamicResolver) GetWorkflowManager ¶ added in v0.18.0
func (sdr *StatefulDynamicResolver) GetWorkflowManager() *statemachine.WorkflowManager
func (*StatefulDynamicResolver) IsStateMachineEnabled ¶ added in v0.18.0
func (sdr *StatefulDynamicResolver) IsStateMachineEnabled() bool
func (*StatefulDynamicResolver) Resolve ¶ added in v0.18.0
func (sdr *StatefulDynamicResolver) Resolve(ctx context.Context, c Conflict) (ResolvedConflict, error)
Resolve implements the ConflictResolver interface with enhanced state machine capabilities. This method provides full backward compatibility while adding workflow tracking and audit trails when state machine features are enabled.
func (*StatefulDynamicResolver) SubscribeToStateChanges ¶ added in v0.18.0
func (sdr *StatefulDynamicResolver) SubscribeToStateChanges(observer statemachine.StateObserver[statemachine.ConflictResolutionState])
type StatefulRealtimeNotifier ¶ added in v0.18.0
type StatefulRealtimeNotifier interface {
RealtimeNotifier
statemachine.StatefulTransport
}
StatefulRealtimeNotifier extends RealtimeNotifier with state machine capabilities
type StatefulRealtimeSyncManager ¶ added in v0.18.0
type StatefulRealtimeSyncManager struct {
// contains filtered or unexported fields
}
StatefulRealtimeSyncManager enhances RealtimeSyncManager with transport state awareness
func NewStatefulRealtimeSyncManager ¶ added in v0.18.0
func NewStatefulRealtimeSyncManager(store EventStore, transport Transport, options *StatefulRealtimeSyncOptions) (*StatefulRealtimeSyncManager, error)
NewStatefulRealtimeSyncManager creates a realtime sync manager with transport state awareness
func (*StatefulRealtimeSyncManager) CanReceiveRealtimeData ¶ added in v0.18.0
func (srsm *StatefulRealtimeSyncManager) CanReceiveRealtimeData() bool
CanReceiveRealtimeData returns true if the transport can receive data in the current state
func (*StatefulRealtimeSyncManager) CanSendRealtimeData ¶ added in v0.18.0
func (srsm *StatefulRealtimeSyncManager) CanSendRealtimeData() bool
CanSendRealtimeData returns true if the transport can send data in the current state
func (StatefulRealtimeSyncManager) Close ¶ added in v0.18.0
func (rsm StatefulRealtimeSyncManager) Close() error
Close extends the base close method to handle real-time resources
func (StatefulRealtimeSyncManager) DisableRealtime ¶ added in v0.18.0
func (rsm StatefulRealtimeSyncManager) DisableRealtime() error
DisableRealtime stops real-time notifications
func (StatefulRealtimeSyncManager) EnableRealtime ¶ added in v0.18.0
EnableRealtime starts real-time notifications
func (*StatefulRealtimeSyncManager) EnhancedClose ¶ added in v0.18.0
func (srsm *StatefulRealtimeSyncManager) EnhancedClose() error
EnhancedClose extends the base close with transport state management cleanup
func (*StatefulRealtimeSyncManager) EnhancedDisableRealtime ¶ added in v0.18.0
func (srsm *StatefulRealtimeSyncManager) EnhancedDisableRealtime() error
EnhancedDisableRealtime stops real-time notifications with state machine integration
func (*StatefulRealtimeSyncManager) EnhancedEnableRealtime ¶ added in v0.18.0
func (srsm *StatefulRealtimeSyncManager) EnhancedEnableRealtime(ctx context.Context) error
EnhancedEnableRealtime starts real-time notifications with state machine integration
func (StatefulRealtimeSyncManager) GetConnectionStatus ¶ added in v0.18.0
func (rsm StatefulRealtimeSyncManager) GetConnectionStatus() ConnectionStatus
GetConnectionStatus returns the current connection status
func (*StatefulRealtimeSyncManager) GetEnhancedConnectionStatus ¶ added in v0.18.0
func (srsm *StatefulRealtimeSyncManager) GetEnhancedConnectionStatus() map[string]interface{}
GetEnhancedConnectionStatus returns enhanced connection status including transport state
func (*StatefulRealtimeSyncManager) GetTransportConnectionMetadata ¶ added in v0.18.0
func (srsm *StatefulRealtimeSyncManager) GetTransportConnectionMetadata() map[string]interface{}
GetTransportConnectionMetadata returns metadata about the transport connection
func (*StatefulRealtimeSyncManager) GetTransportConnectionState ¶ added in v0.18.0
func (srsm *StatefulRealtimeSyncManager) GetTransportConnectionState() statemachine.TransportState
GetTransportConnectionState returns the current transport connection state
func (StatefulRealtimeSyncManager) IsRealtimeActive ¶ added in v0.18.0
func (rsm StatefulRealtimeSyncManager) IsRealtimeActive() bool
IsRealtimeActive returns true if real-time notifications are active
func (*StatefulRealtimeSyncManager) IsTransportHealthy ¶ added in v0.18.0
func (srsm *StatefulRealtimeSyncManager) IsTransportHealthy() bool
IsTransportHealthy returns true if the transport is in a healthy state
func (*StatefulRealtimeSyncManager) MonitorTransportHealth ¶ added in v0.18.0
func (srsm *StatefulRealtimeSyncManager) MonitorTransportHealth(ctx context.Context, checkInterval time.Duration)
MonitorTransportHealth continuously monitors transport health and manages reconnections
func (*StatefulRealtimeSyncManager) SubscribeToTransportStateChanges ¶ added in v0.18.0
func (srsm *StatefulRealtimeSyncManager) SubscribeToTransportStateChanges(handler func(statemachine.StateTransition[statemachine.TransportState])) error
SubscribeToTransportStateChanges allows external components to listen to transport state changes
type StatefulRealtimeSyncOptions ¶ added in v0.18.0
type StatefulRealtimeSyncOptions struct {
RealtimeSyncOptions
// TransportStateConfig for configuring transport state management
TransportStateConfig statemachine.TransportStateManagerConfig
// EnableTransportStateLogging enables detailed transport state logging
EnableTransportStateLogging bool
// ConnectionTimeout for transport connection attempts
ConnectionTimeout time.Duration
// MaxConnectionAttempts before marking transport as failed
MaxConnectionAttempts int
// ConnectionRetryDelay between connection attempts
ConnectionRetryDelay time.Duration
}
StatefulRealtimeSyncOptions extends RealtimeSyncOptions with state machine configuration
type SyncManager
deprecated
type SyncManager interface {
// Sync performs a bidirectional sync operation
Sync(ctx context.Context) (*SyncResult, error)
// Push sends local events to remote
Push(ctx context.Context) (*SyncResult, error)
// Pull retrieves remote events to local
Pull(ctx context.Context) (*SyncResult, error)
// StartAutoSync begins automatic synchronization at the configured interval
StartAutoSync(ctx context.Context) error
// StopAutoSync stops automatic synchronization
StopAutoSync() error
// Subscribe to sync events (optional)
Subscribe(handler func(*SyncResult)) error
// Close shuts down the sync manager
Close() error
}
SyncManager coordinates the synchronization process between local and remote stores. This is the main entry point for the sync package.
Deprecated: SyncManager is deprecated. Use SyncNode instead, which provides the same functionality with a cleaner API. See node.go for the preferred interface and NewNode() for the constructor.
func New ¶ added in v0.23.0
func New(cfg Config) (SyncManager, error)
New is the canonical constructor for creating a SyncManager from a Config. It validates the configuration and wires up the manager using the builder internally.
This is the recommended entrypoint for applications. Functional options (WithX) remain supported for advanced use cases and backward compatibility.
Example:
cfg := synckit.Config{
Store: store,
Transport: transport,
Timeout: 30 * time.Second,
}
mgr, err := synckit.New(cfg)
func NewManager
deprecated
added in
v0.15.0
func NewManager(opts ...ManagerOption) (SyncManager, error)
NewManager constructs a SyncManager using functional options on top of the existing builder. It keeps your builder for advanced use while offering a concise, discoverable API.
Deprecated: NewManager is deprecated. Use NewNode() instead, which provides the same functionality with a cleaner API. See node.go for details.
func NewSyncManager ¶
func NewSyncManager(store EventStore, transport Transport, opts *SyncOptions, logger *slog.Logger, projectionConfig *ProjectionConfig) SyncManager
NewSyncManager creates a new sync manager with the provided components
type SyncManagerBuilder ¶
type SyncManagerBuilder struct {
// contains filtered or unexported fields
}
SyncManagerBuilder provides a fluent interface for constructing SyncManager instances.
func NewSyncManagerBuilder ¶
func NewSyncManagerBuilder() *SyncManagerBuilder
NewSyncManagerBuilder creates a new builder with default options.
func (*SyncManagerBuilder) Build ¶
func (b *SyncManagerBuilder) Build() (SyncManager, error)
Build creates a new SyncManager instance with the configured options.
func (*SyncManagerBuilder) Reset ¶
func (b *SyncManagerBuilder) Reset() *SyncManagerBuilder
Reset clears the builder, allowing reuse.
func (*SyncManagerBuilder) WithBatchSize ¶
func (b *SyncManagerBuilder) WithBatchSize(size int) *SyncManagerBuilder
WithBatchSize sets the batch size for sync operations.
func (*SyncManagerBuilder) WithCompression ¶
func (b *SyncManagerBuilder) WithCompression(enabled bool) *SyncManagerBuilder
WithCompression enables data compression during transport.
func (*SyncManagerBuilder) WithConflictResolver ¶
func (b *SyncManagerBuilder) WithConflictResolver(resolver ConflictResolver) *SyncManagerBuilder
WithConflictResolver sets the conflict resolution strategy.
func (*SyncManagerBuilder) WithFilter ¶
func (b *SyncManagerBuilder) WithFilter(filter func(Event) bool) *SyncManagerBuilder
WithFilter sets an event filter function.
func (*SyncManagerBuilder) WithHealthChecker ¶ added in v0.17.0
func (b *SyncManagerBuilder) WithHealthChecker(checker interface{}) *SyncManagerBuilder
WithHealthChecker sets a health checker for monitoring sync-kit component health.
func (*SyncManagerBuilder) WithLogger ¶ added in v0.12.0
func (b *SyncManagerBuilder) WithLogger(logger *slog.Logger) *SyncManagerBuilder
WithLogger sets a custom logger for the SyncManager.
func (*SyncManagerBuilder) WithMetricsCollector ¶ added in v0.17.0
func (b *SyncManagerBuilder) WithMetricsCollector(collector MetricsCollector) *SyncManagerBuilder
WithMetricsCollector sets a metrics collector for observability.
func (*SyncManagerBuilder) WithProjectionMaxWorkers ¶ added in v0.20.0
func (b *SyncManagerBuilder) WithProjectionMaxWorkers(workers int) *SyncManagerBuilder
WithProjectionMaxWorkers sets the maximum number of concurrent projection workers.
func (*SyncManagerBuilder) WithProjectionTimeout ¶ added in v0.20.0
func (b *SyncManagerBuilder) WithProjectionTimeout(timeout time.Duration) *SyncManagerBuilder
WithProjectionTimeout sets the timeout for projection operations.
func (*SyncManagerBuilder) WithProjections ¶ added in v0.20.0
func (b *SyncManagerBuilder) WithProjections(runners ...ProjectionRunner) *SyncManagerBuilder
WithProjections adds projection runners to execute after successful sync.
func (*SyncManagerBuilder) WithProjectionsOnSync ¶ added in v0.20.0
func (b *SyncManagerBuilder) WithProjectionsOnSync(enabled bool) *SyncManagerBuilder
WithProjectionsOnSync enables automatic projection execution after sync.
func (*SyncManagerBuilder) WithPullOnly ¶
func (b *SyncManagerBuilder) WithPullOnly() *SyncManagerBuilder
WithPullOnly configures the SyncManager to only pull events.
func (*SyncManagerBuilder) WithPushOnly ¶
func (b *SyncManagerBuilder) WithPushOnly() *SyncManagerBuilder
WithPushOnly configures the SyncManager to only push events.
func (*SyncManagerBuilder) WithStore ¶
func (b *SyncManagerBuilder) WithStore(store EventStore) *SyncManagerBuilder
WithStore sets the EventStore for the SyncManager.
func (*SyncManagerBuilder) WithSyncInterval ¶
func (b *SyncManagerBuilder) WithSyncInterval(interval time.Duration) *SyncManagerBuilder
WithSyncInterval sets the interval for automatic synchronization.
func (*SyncManagerBuilder) WithTimeout ¶
func (b *SyncManagerBuilder) WithTimeout(timeout time.Duration) *SyncManagerBuilder
WithTimeout sets the maximum duration for sync operations.
func (*SyncManagerBuilder) WithTracer ¶ added in v0.17.0
func (b *SyncManagerBuilder) WithTracer(tracer interface { StartSyncOperation(ctx context.Context, operation string) (context.Context, trace.Span) StartTransportOperation(ctx context.Context, operation, transport string) (context.Context, trace.Span) StartStorageOperation(ctx context.Context, operation, storageType string) (context.Context, trace.Span) StartConflictResolution(ctx context.Context, strategy string) (context.Context, trace.Span) RecordError(span trace.Span, err error, description string) SetSyncResult(span trace.Span, eventsPushed, eventsPulled, conflictsResolved int) }) *SyncManagerBuilder
WithTracer sets a tracer for distributed tracing.
func (*SyncManagerBuilder) WithTransport ¶
func (b *SyncManagerBuilder) WithTransport(transport Transport) *SyncManagerBuilder
WithTransport sets the Transport for the SyncManager.
func (*SyncManagerBuilder) WithValidation ¶
func (b *SyncManagerBuilder) WithValidation() *SyncManagerBuilder
WithValidation enables additional validation checks during sync operations.
type SyncNode ¶ added in v0.22.0
type SyncNode = SyncManager
SyncNode is the preferred façade for creating and managing sync participants. It is currently a type alias for SyncManager but may evolve into a dedicated struct in future releases.
func NewHTTPClientNode ¶ added in v0.22.0
func NewHTTPClientNode(store EventStore, transport Transport) (SyncNode, error)
NewHTTPClientNode creates a SyncNode configured as an HTTP client. The caller should provide an appropriate HTTP client transport configured for the server URL.
Example usage:
store := sqlite.New("client.db")
transport := httptransport.NewTransport("http://localhost:8080/sync", nil, nil, nil)
node, err := synckit.NewHTTPClientNode(store, transport)
func NewHTTPServerNode ¶ added in v0.22.0
func NewHTTPServerNode(store EventStore, transport Transport) (SyncNode, error)
NewHTTPServerNode creates a SyncNode configured with the provided store and transport. The caller should provide an appropriate HTTP server transport implementation.
Example usage:
store := sqlite.New("app.db")
transport := httptransport.NewTransport("", nil, nil, nil) // Configure for server use
node, err := synckit.NewHTTPServerNode(store, transport)
func NewInMemoryNode ¶ added in v0.22.0
func NewInMemoryNode(store EventStore, transport Transport) (SyncNode, error)
NewInMemoryNode creates a SyncNode with the provided in-memory store and transport. This is perfect for development, testing, and examples where no persistence is needed.
Example usage:
store := memstore.New() transport := memchan.New(16) node, err := synckit.NewInMemoryNode(store, transport)
func NewNode ¶ added in v0.22.0
func NewNode(opts ...ManagerOption) (SyncNode, error)
NewNode mirrors NewManager to construct a SyncNode. Use this instead of NewManager in new code.
Example (InMemory) ¶
ExampleNewNode_inMemory demonstrates creating a SyncNode with in-memory storage and performing a sync operation in a single-process environment.
package main
import (
"context"
"fmt"
"io"
"log/slog"
"github.com/c0deZ3R0/go-sync-kit/storage/memstore"
"github.com/c0deZ3R0/go-sync-kit/synckit"
"github.com/c0deZ3R0/go-sync-kit/transport/memchan"
)
func main() {
ctx := context.Background()
// Create an in-memory event store
store := memstore.New()
// Create an in-memory channel transport with capacity for 16 events
transport := memchan.New(16)
// Create a silent logger for clean example output
silentLogger := slog.New(slog.NewTextHandler(io.Discard, nil))
// Create a SyncNode with the store and transport
node, err := synckit.NewNode(
synckit.WithStore(store),
synckit.WithTransport(transport),
synckit.WithManagerLogger(silentLogger),
)
if err != nil {
panic(err)
}
// Perform an initial sync (no events yet)
result, err := node.Sync(ctx)
if err != nil {
panic(err)
}
fmt.Printf("Events pulled: %d, Events pushed: %d\n", result.EventsPulled, result.EventsPushed)
}
Output: Events pulled: 0, Events pushed: 0
type SyncOptions ¶
type SyncOptions struct {
// LastCursorLoader loads the last saved cursor for cursor-based syncs
LastCursorLoader func() cursor.Cursor
// CursorSaver saves the latest cursor after a successful sync
CursorSaver func(cursor.Cursor) error
// PushOnly indicates this client should only push events, not pull
PushOnly bool
// PullOnly indicates this client should only pull events, not push
PullOnly bool
// ConflictResolver to use for handling conflicts
ConflictResolver ConflictResolver
// Filter can be used to sync only specific events
Filter func(Event) bool
// BatchSize limits how many events to sync at once
BatchSize int
// SyncInterval for automatic periodic sync (0 disables)
SyncInterval time.Duration
// RetryConfig configures retry behavior for sync operations
RetryConfig *RetryConfig
// EnableValidation enables additional validation checks during sync
EnableValidation bool
// Timeout sets the maximum duration for sync operations
Timeout time.Duration
// EnableCompression enables data compression during transport
EnableCompression bool
// MetricsCollector for observability hooks (optional)
MetricsCollector MetricsCollector
// Tracer for distributed tracing (optional)
Tracer interface {
// StartSyncOperation starts a new span for a sync operation
StartSyncOperation(ctx context.Context, operation string) (context.Context, trace.Span)
// StartTransportOperation starts a new span for transport operations
StartTransportOperation(ctx context.Context, operation, transport string) (context.Context, trace.Span)
// StartStorageOperation starts a new span for storage operations
StartStorageOperation(ctx context.Context, operation, storageType string) (context.Context, trace.Span)
// StartConflictResolution starts a new span for conflict resolution
StartConflictResolution(ctx context.Context, strategy string) (context.Context, trace.Span)
// RecordError records an error on a span
RecordError(span trace.Span, err error, description string)
// SetSyncResult sets sync result attributes on a span
SetSyncResult(span trace.Span, eventsPushed, eventsPulled, conflictsResolved int)
}
}
SyncOptions configures the synchronization behavior
type SyncResult ¶
type SyncResult struct {
// EventsPushed is the number of events sent to remote
EventsPushed int
// EventsPulled is the number of events received from remote
EventsPulled int
// ConflictsResolved is the number of conflicts that were resolved
ConflictsResolved int
// Errors contains any non-fatal errors that occurred during sync
Errors []error
// StartTime is when the sync operation began
StartTime time.Time
// Duration is how long the sync took
Duration time.Duration
// LocalVersion is the local version after sync
LocalVersion Version
// RemoteVersion is the remote version after sync
RemoteVersion Version
}
SyncResult provides information about a completed sync operation
type SyncState ¶ added in v0.18.0
type SyncState int
SyncState represents the state of a sync operation.
const ( // SyncIdle indicates the sync manager is ready for new operations SyncIdle SyncState = iota // SyncInitializing indicates a sync operation is being set up SyncInitializing // SyncPushing indicates local events are being sent to remote SyncPushing // SyncPulling indicates remote events are being retrieved SyncPulling // SyncResolvingConflicts indicates conflicts are being processed SyncResolvingConflicts // SyncCompleted indicates the sync operation completed successfully SyncCompleted // SyncFailed indicates the sync operation failed SyncFailed // SyncCancelled indicates the sync operation was cancelled SyncCancelled )
func (SyncState) CanAutoSync ¶ added in v0.18.0
CanAutoSync returns true if auto-sync operations are allowed in this state.
func (SyncState) IsActive ¶ added in v0.18.0
IsActive returns true if this state represents an active sync operation.
func (SyncState) IsTerminal ¶ added in v0.18.0
IsTerminal returns true if this state represents the end of a sync operation.
type SyncStateObserver ¶ added in v0.18.0
type SyncStateObserver struct {
// OnStateChange is called when a sync state transition succeeds
OnStateChange func(from, to SyncState, duration time.Duration, metadata map[string]interface{})
// OnStateChangeError is called when a sync state transition fails
OnStateChangeError func(from, to SyncState, err error, metadata map[string]interface{})
}
SyncStateObserver provides a convenient way to observe sync state changes.
func (*SyncStateObserver) OnTransition ¶ added in v0.18.0
func (o *SyncStateObserver) OnTransition(transition statemachine.StateTransition[SyncState])
OnTransition implements the StateObserver interface.
func (*SyncStateObserver) OnTransitionFailed ¶ added in v0.18.0
func (o *SyncStateObserver) OnTransitionFailed(from, to SyncState, err error, metadata map[string]interface{})
OnTransitionFailed implements the StateObserver interface.
type TestEvent ¶
type TestEvent struct{}
TestEvent implements Event interface for testing
func (*TestEvent) AggregateID ¶
type TestEventStore ¶
type TestEventStore struct {
// contains filtered or unexported fields
}
TestEventStore implements a simple in-memory event store for testing
func (*TestEventStore) Close ¶
func (m *TestEventStore) Close() error
func (*TestEventStore) LatestVersion ¶
func (m *TestEventStore) LatestVersion(_ context.Context) (Version, error)
func (*TestEventStore) Load ¶
func (m *TestEventStore) Load(_ context.Context, _ Version, _ ...Filter) ([]EventWithVersion, error)
func (*TestEventStore) LoadByAggregate ¶
func (m *TestEventStore) LoadByAggregate(_ context.Context, _ string, _ Version, _ ...Filter) ([]EventWithVersion, error)
func (*TestEventStore) ParseVersion ¶
type TestTransport ¶
type TestTransport struct {
// contains filtered or unexported fields
}
TestTransport implements a simple transport for testing
func (*TestTransport) Close ¶
func (m *TestTransport) Close() error
func (*TestTransport) GetLatestVersion ¶
func (m *TestTransport) GetLatestVersion(ctx context.Context) (Version, error)
func (*TestTransport) Pull ¶
func (m *TestTransport) Pull(ctx context.Context, since Version) ([]EventWithVersion, error)
func (*TestTransport) Push ¶
func (m *TestTransport) Push(ctx context.Context, events []EventWithVersion) error
func (*TestTransport) Subscribe ¶
func (m *TestTransport) Subscribe(_ context.Context, _ func([]EventWithVersion) error) error
type TestVersion ¶
type TestVersion struct{}
TestVersion implements Version interface for testing
func (*TestVersion) Compare ¶
func (m *TestVersion) Compare(_ Version) int
func (*TestVersion) IsZero ¶
func (m *TestVersion) IsZero() bool
func (*TestVersion) String ¶
func (m *TestVersion) String() string
type TimeoutSettings ¶ added in v0.15.0
type TimeoutSettings struct {
ResolutionTimeoutMs int `json:"resolution_timeout_ms,omitempty" yaml:"resolution_timeout_ms,omitempty"`
ManualReviewTimeoutMs int `json:"manual_review_timeout_ms,omitempty" yaml:"manual_review_timeout_ms,omitempty"`
}
TimeoutSettings contains timeout configurations.
type Transport ¶
Transport handles the actual network communication between clients and servers. Implementations can use HTTP, gRPC, WebSockets, NATS, etc.
This is now an alias to the stabilized interface in synckit/types.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package codec provides event encoding and decoding support.
|
Package codec provides event encoding and decoding support. |
|
Package statemachine provides state machine and snapshot support for event sourcing.
|
Package statemachine provides state machine and snapshot support for event sourcing. |
|
Package types defines core interfaces and types for the sync kit.
|
Package types defines core interfaces and types for the sync kit. |