synckit

package
v0.24.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 26, 2025 License: MIT Imports: 19 Imported by: 0

Documentation

Overview

Package synckit - aliases for backward compatibility and future expansion

Package synckit provides a generic event-driven synchronization system for distributed applications.

Overview

Synckit enables offline-first architectures with conflict resolution and pluggable storage backends. Applications import this single package to access all core types and interfaces:

import "github.com/c0deZ3R0/go-sync-kit/synckit"

Core Concepts

The system is built around four key abstractions:

1. Event: Domain events representing state changes 2. Version: Point-in-time snapshots for ordering and conflict detection 3. Store: Local event persistence (EventStore interface) 4. Transport: Network communication layer (Transport interface)

Usage Example

// Define your event type
type UserEvent struct {
	id        string
	eventType string
	aggID     string
	data      interface{}
	meta      map[string]interface{}
}

func (e *UserEvent) ID() string                       { return e.id }
func (e *UserEvent) Type() string                     { return e.eventType }
func (e *UserEvent) AggregateID() string              { return e.aggID }
func (e *UserEvent) Data() interface{}                { return e.data }
func (e *UserEvent) Metadata() map[string]interface{} { return e.meta }

// Use a storage backend
store, _ := memstore.New()

// Configure a transport
transport := httptransport.NewClient("https://api.example.com")

// Create a sync node
node := synckit.NewNode(store, transport)

// Perform synchronization
result, err := node.Sync(context.Background())
if err != nil {
	log.Fatal(err)
}
log.Printf("Synced: %d pushed, %d pulled", result.EventsPushed, result.EventsPulled)

Conflict Resolution

When local and remote changes collide, implement a ConflictResolver:

type LastWriteWinsResolver struct{}

func (r *LastWriteWinsResolver) Resolve(ctx context.Context, c synckit.Conflict) (synckit.ResolvedConflict, error) {
	// Compare versions and pick the latest
	if c.Remote.Version.Compare(c.Local.Version) > 0 {
		return synckit.ResolvedConflict{
			ResolvedEvents: []synckit.EventWithVersion{c.Remote},
			Decision:       "remote-wins",
		}, nil
	}
	return synckit.ResolvedConflict{
		ResolvedEvents: []synckit.EventWithVersion{c.Local},
		Decision:       "local-wins",
	}, nil
}

Architecture

Synckit supports: - Offline-first: local persistence with eventual consistency - Pluggable stores: in-memory, SQLite, PostgreSQL, BadgerDB - Pluggable transports: HTTP, WebSockets, SSE, RabbitMQ - Conflict resolution: custom strategies per domain - Observability: structured logging, metrics, tracing

API surface

Import synckit to access all core types and interfaces:

import "github.com/c0deZ3R0/go-sync-kit/synckit"

Core types (aliased from synckit/types): - Event: Represents a syncable event - Version: Point-in-time snapshot for ordering and conflict detection - EventWithVersion: Pairs an event with its version - Conflict: Context for resolving detected conflicts - ResolvedConflict: Resolution decision and follow-up data - ConflictResolver: Strategy interface for conflict resolution

Core interfaces: - EventStore: Persistence for events (see sync.go) - Transport: Network communication layer (see sync.go) - CursorTransport: Transport with cursor-based pagination (see sync.go)

Implementor guidance - EventStore: implement Store, Load, LoadByAggregate, LatestVersion, ParseVersion, Close - Transport: implement Push, Pull, GetLatestVersion, Subscribe, Close

See subpackages for concrete implementations: - storage/memstore: In-memory store for testing - storage/sqlite: SQLite-based persistent store - storage/postgres: PostgreSQL-based persistent store - transport/httptransport: HTTP-based transport - transport/sse: Server-Sent Events transport - transport/rabbitmq: RabbitMQ-based transport

Package sync provides a generic event-driven synchronization system for distributed applications. It supports offline-first architectures with conflict resolution and pluggable storage backends.

Example

Example demonstrates the basic sync workflow: store events locally, configure a node, and sync.

package main

import (
	"context"
	"fmt"
	"io"
	"log/slog"

	"github.com/c0deZ3R0/go-sync-kit/storage/memstore"
	"github.com/c0deZ3R0/go-sync-kit/synckit"
	"github.com/c0deZ3R0/go-sync-kit/transport/memchan"
)

func main() {
	ctx := context.Background()

	// 1. Create storage and transport
	store := memstore.New()
	transport := memchan.New(16)
	silentLogger := slog.New(slog.NewTextHandler(io.Discard, nil))

	// 2. Create a node
	node, err := synckit.NewNode(
		synckit.WithStore(store),
		synckit.WithTransport(transport),
		synckit.WithManagerLogger(silentLogger),
	)
	if err != nil {
		panic(err)
	}

	// 3. Perform a sync cycle
	result, err := node.Sync(ctx)
	if err != nil {
		panic(err)
	}

	// 4. Inspect results
	if result.EventsPushed == 0 && result.EventsPulled == 0 {
		fmt.Println("Sync complete: no events to sync")
	}

}
Output:

Sync complete: no events to sync

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewSyncStateMachine added in v0.18.0

func NewSyncStateMachine() (statemachine.StateMachine[SyncState], error)

NewSyncStateMachine creates a new state machine for sync operations.

func SyncStateTransitionRules added in v0.18.0

func SyncStateTransitionRules() statemachine.TransitionRules[SyncState]

SyncStateTransitionRules defines the valid transitions for sync operations.

Types

type AdditiveMergeResolver added in v0.15.0

type AdditiveMergeResolver struct{}

func (*AdditiveMergeResolver) Resolve added in v0.15.0

type AuditableOption added in v0.15.0

type AuditableOption interface {
	// contains filtered or unexported methods
}

AuditableOption provides configuration options for AuditableResolver.

func WithAuditLogger added in v0.15.0

func WithAuditLogger(logger Logger) AuditableOption

WithAuditLogger sets a logger for the auditable resolver.

func WithOriginExtractor added in v0.15.0

func WithOriginExtractor(extractor func(context.Context) string) AuditableOption

WithOriginExtractor sets a function to extract origin information from context.

func WithSessionIDExtractor added in v0.15.0

func WithSessionIDExtractor(extractor func(context.Context) string) AuditableOption

WithSessionIDExtractor sets a function to extract session ID from context.

func WithUserIDExtractor added in v0.15.0

func WithUserIDExtractor(extractor func(context.Context) string) AuditableOption

WithUserIDExtractor sets a function to extract user ID from context.

type AuditableResolver added in v0.15.0

type AuditableResolver struct {
	// contains filtered or unexported fields
}

AuditableResolver wraps any ConflictResolver to provide audit trail capabilities.

func NewAuditableResolver added in v0.15.0

func NewAuditableResolver(resolver ConflictResolver, caretaker MementoCaretaker, opts ...AuditableOption) *AuditableResolver

NewAuditableResolver creates a new auditable resolver that wraps an existing resolver.

func (*AuditableResolver) GetAuditTrail added in v0.15.0

func (ar *AuditableResolver) GetAuditTrail(ctx context.Context, aggregateID string) ([]*ResolutionMemento, error)

GetAuditTrail retrieves the audit trail for a specific aggregate.

func (*AuditableResolver) Resolve added in v0.15.0

func (ar *AuditableResolver) Resolve(ctx context.Context, conflict Conflict) (ResolvedConflict, error)

Resolve implements ConflictResolver interface with audit trail creation.

type BackoffStrategy

type BackoffStrategy interface {
	// NextDelay returns the delay before the next reconnection attempt
	NextDelay(attempt int) time.Duration

	// Reset resets the backoff strategy after a successful connection
	Reset()
}

BackoffStrategy defines how to handle reconnection delays

type BasicValidator added in v0.15.0

type BasicValidator struct{}

BasicValidator provides basic configuration validation.

func (*BasicValidator) Name added in v0.15.0

func (v *BasicValidator) Name() string

func (*BasicValidator) Validate added in v0.15.0

func (v *BasicValidator) Validate(config *RuleConfig) error

type CompositeOption added in v0.15.0

type CompositeOption interface {
	// contains filtered or unexported methods
}

CompositeOption provides configuration options for CompositeResolver.

func WithCompositeLogger added in v0.15.0

func WithCompositeLogger(logger Logger) CompositeOption

WithCompositeLogger sets a logger for the composite resolver.

type CompositeResolver added in v0.15.0

type CompositeResolver struct {
	// contains filtered or unexported fields
}

CompositeResolver implements ConflictResolver and acts as a root container for multiple rule groups with group-aware resolution.

func NewCompositeResolver added in v0.15.0

func NewCompositeResolver(fallback ConflictResolver, opts ...CompositeOption) *CompositeResolver

NewCompositeResolver creates a new composite resolver that manages multiple rule groups.

func (*CompositeResolver) AddGroup added in v0.15.0

func (cr *CompositeResolver) AddGroup(group *RuleGroup) *CompositeResolver

AddGroup adds a rule group to the composite resolver.

func (*CompositeResolver) GetAllGroups added in v0.15.0

func (cr *CompositeResolver) GetAllGroups() []*RuleGroup

GetAllGroups returns all rule groups managed by this composite resolver.

func (*CompositeResolver) GetGroupStats added in v0.15.0

func (cr *CompositeResolver) GetGroupStats() []RuleGroupStats

GetGroupStats returns statistics for all managed rule groups.

func (*CompositeResolver) Resolve added in v0.15.0

func (cr *CompositeResolver) Resolve(ctx context.Context, conflict Conflict) (ResolvedConflict, error)

Resolve implements ConflictResolver interface for CompositeResolver. It tries each group in order, then falls back to the global fallback.

type Config added in v0.23.0

type Config struct {
	// Store provides local event persistence (required).
	Store EventStore

	// Transport handles network communication (optional for local-only scenarios).
	Transport Transport

	// Logger for structured logging (optional; defaults to slog.Default()).
	Logger *slog.Logger

	// Cursor specifies the versioning strategy (Integer or Vector).
	// Default: CursorInteger.
	Cursor CursorMode

	// Retry defines backoff/retry policy for transient failures.
	// Default: no retries.
	Retry RetryPolicy

	// Resolvers is the conflict resolution registry (optional).
	// If nil, uses Last-Write-Wins as default resolver.
	// Note: ResolverRegistry type is forward-compatible placeholder; use ConflictResolver for now.
	Resolvers ConflictResolver

	// Timeout is the maximum duration for sync operations (0 = no timeout).
	// Default: 0 (no timeout).
	Timeout time.Duration

	// BatchSize limits the number of events to sync at once.
	// Default: 100.
	BatchSize int

	// SyncInterval for automatic periodic sync (0 = disabled).
	// Default: 0 (manual sync only).
	SyncInterval time.Duration

	// PushOnly restricts sync to only pushing local events (no pull).
	// Default: false.
	PushOnly bool

	// PullOnly restricts sync to only pulling remote events (no push).
	// Default: false.
	PullOnly bool

	// EnableValidation enables additional validation checks during sync.
	// Default: false.
	EnableValidation bool

	// EnableCompression enables data compression during transport.
	// Default: false.
	EnableCompression bool

	// Filter is an optional event filter function.
	// Events are synced only if Filter returns true.
	Filter func(Event) bool

	// MetricsCollector for observability hooks (optional).
	MetricsCollector MetricsCollector

	// Tracer for distributed tracing (optional).
	Tracer interface {
		StartSyncOperation(ctx context.Context, operation string) (context.Context, trace.Span)
		StartTransportOperation(ctx context.Context, operation, transport string) (context.Context, trace.Span)
		StartStorageOperation(ctx context.Context, operation, storageType string) (context.Context, trace.Span)
		StartConflictResolution(ctx context.Context, strategy string) (context.Context, trace.Span)
		RecordError(span trace.Span, err error, description string)
		SetSyncResult(span trace.Span, eventsPushed, eventsPulled, conflictsResolved int)
	}
}

Config provides a canonical configuration structure for creating a SyncManager. It offers a declarative alternative to functional options, with built-in validation.

Example usage:

cfg := synckit.Config{
    Store:     myStore,
    Transport: myTransport,
    Logger:    slog.Default(),
    Cursor:    synckit.CursorInteger,
    Retry: synckit.RetryPolicy{
        Max:    3,
        Base:   100 * time.Millisecond,
        Cap:    5 * time.Second,
        Jitter: true,
    },
    Timeout: 30 * time.Second,
}

mgr, err := synckit.New(cfg)
if err != nil {
    log.Fatal(err)
}

func (*Config) Validate added in v0.23.0

func (c *Config) Validate() error

Validate checks the configuration for correctness and returns an error if invalid. It enforces required fields, sane timeouts, and retry bounds.

type ConfigLoader added in v0.15.0

type ConfigLoader struct {
	// contains filtered or unexported fields
}

ConfigLoader provides dynamic loading and validation of conflict resolution rules from YAML or JSON configuration files with runtime update capabilities.

func NewConfigLoader added in v0.15.0

func NewConfigLoader(opts ...ConfigLoaderOption) *ConfigLoader

NewConfigLoader creates a new configuration loader.

func (*ConfigLoader) BuildDynamicResolver added in v0.15.0

func (cl *ConfigLoader) BuildDynamicResolver() (*DynamicResolver, error)

BuildDynamicResolver creates a DynamicResolver from the current configuration.

func (*ConfigLoader) GetCurrentConfig added in v0.15.0

func (cl *ConfigLoader) GetCurrentConfig() *RuleConfig

GetCurrentConfig returns the current configuration.

func (*ConfigLoader) LoadFromBytes added in v0.15.0

func (cl *ConfigLoader) LoadFromBytes(data []byte, format string) error

LoadFromBytes loads configuration from raw bytes.

func (*ConfigLoader) LoadFromFile added in v0.15.0

func (cl *ConfigLoader) LoadFromFile(filepath string) error

LoadFromFile loads configuration from a YAML or JSON file.

type ConfigLoaderOption added in v0.15.0

type ConfigLoaderOption interface {
	// contains filtered or unexported methods
}

ConfigLoaderOption provides configuration options for ConfigLoader.

func WithConfigLogger added in v0.15.0

func WithConfigLogger(logger Logger) ConfigLoaderOption

WithConfigLogger sets a logger for the config loader.

func WithConfigValidator added in v0.15.0

func WithConfigValidator(validator ConfigValidator) ConfigLoaderOption

WithConfigValidator adds a configuration validator.

func WithTransformer added in v0.15.0

func WithTransformer(transformer ConfigTransformer) ConfigLoaderOption

WithTransformer adds a configuration transformer.

func WithWatcher added in v0.15.0

func WithWatcher(watcher ConfigWatcher) ConfigLoaderOption

WithWatcher adds a configuration change watcher.

type ConfigSettings added in v0.15.0

type ConfigSettings struct {
	DefaultFallback string            `json:"default_fallback,omitempty" yaml:"default_fallback,omitempty"`
	Timeouts        TimeoutSettings   `json:"timeouts,omitempty" yaml:"timeouts,omitempty"`
	Limits          LimitSettings     `json:"limits,omitempty" yaml:"limits,omitempty"`
	Features        map[string]bool   `json:"features,omitempty" yaml:"features,omitempty"`
	Custom          map[string]string `json:"custom,omitempty" yaml:"custom,omitempty"`
}

ConfigSettings contains global configuration settings.

type ConfigTransformer added in v0.15.0

type ConfigTransformer interface {
	Transform(config *RuleConfig) (*RuleConfig, error)
	Name() string
}

ConfigTransformer allows modification of configuration during loading.

type ConfigValidator added in v0.15.0

type ConfigValidator interface {
	Validate(config *RuleConfig) error
	Name() string
}

ConfigValidator validates configuration before applying it.

type ConfigWatcher added in v0.15.0

type ConfigWatcher interface {
	OnConfigChanged(oldConfig, newConfig *RuleConfig)
	OnConfigError(err error)
	Name() string
}

ConfigWatcher monitors configuration changes.

type Conflict added in v0.15.0

type Conflict = types.Conflict

Re-export conflict types from the types package to maintain API compatibility

type ConflictResolver

type ConflictResolver = types.ConflictResolver
Example (Custom)

ExampleConflictResolver_custom demonstrates configuring a custom conflict resolver.

package main

import (
	"context"
	"fmt"
	"io"
	"log/slog"

	"github.com/c0deZ3R0/go-sync-kit/storage/memstore"
	"github.com/c0deZ3R0/go-sync-kit/synckit"
	"github.com/c0deZ3R0/go-sync-kit/transport/memchan"
)

func main() {
	ctx := context.Background()

	// Create store, transport, and node with the LWW (Last-Write-Wins) resolver
	store := memstore.New()
	transport := memchan.New(16)
	silentLogger := slog.New(slog.NewTextHandler(io.Discard, nil))

	node, err := synckit.NewNode(
		synckit.WithStore(store),
		synckit.WithTransport(transport),
		synckit.WithLWW(), // Use Last-Write-Wins resolver
		synckit.WithManagerLogger(silentLogger),
	)
	if err != nil {
		panic(err)
	}

	// Perform sync
	_, err = node.Sync(ctx)
	if err != nil {
		panic(err)
	}

	fmt.Println("Node created with LWW resolver; sync completed successfully")
}
Output:

Node created with LWW resolver; sync completed successfully

type ConflictState added in v0.15.0

type ConflictState struct {
	AggregateID   string         `json:"aggregate_id"`
	EventType     string         `json:"event_type,omitempty"`
	LocalVersion  string         `json:"local_version"`
	RemoteVersion string         `json:"remote_version"`
	LocalData     any            `json:"local_data,omitempty"`
	RemoteData    any            `json:"remote_data,omitempty"`
	ResolvedData  any            `json:"resolved_data,omitempty"`
	Metadata      map[string]any `json:"metadata,omitempty"`
}

ConflictState captures the state of entities involved in a conflict. This struct is designed to be JSON serializable by storing only the data payloads rather than the full Event interfaces.

type ConnectionStatus

type ConnectionStatus struct {
	Connected         bool
	LastConnected     time.Time
	ReconnectAttempts int
	Error             error
}

ConnectionStatus represents the state of the real-time connection

type CursorMode added in v0.23.0

type CursorMode int

CursorMode specifies the versioning strategy for sync operations.

const (
	// CursorInteger uses simple integer-based versioning (default).
	CursorInteger CursorMode = iota

	// CursorVector uses vector clock-based versioning for distributed scenarios.
	CursorVector
)

type CursorTransport added in v0.8.0

type CursorTransport interface {
	Transport

	// PullWithCursor retrieves events using a cursor-based pagination strategy.
	// Returns events, next cursor, and any error.
	PullWithCursor(ctx context.Context, since cursor.Cursor, limit int) ([]EventWithVersion, cursor.Cursor, error)
}

CursorTransport extends Transport with cursor-based capabilities.

type DynamicResolver added in v0.15.0

type DynamicResolver struct {
	// contains filtered or unexported fields
}

DynamicResolver dispatches conflicts to strategies based on an ordered rule set. If no rule matches, it uses the fallback resolver. If no fallback is configured, Resolve returns an error.

func NewDynamicResolver added in v0.15.0

func NewDynamicResolver(opts ...Option) (*DynamicResolver, error)

NewDynamicResolver constructs a DynamicResolver with validation. Invariants: - At least one rule OR a non-nil fallback must be provided - No rule may have a nil matcher or resolver - If provided, Validator must approve the configuration

func (*DynamicResolver) Resolve added in v0.15.0

Resolve implements the ConflictResolver interface using first-match-wins over the ordered rules, else delegates to fallback.

type Event

type Event = types.Event

Event represents a syncable event in the system. This interface should be implemented by user's event types.

type EventStore

type EventStore = types.EventStore

EventStore provides persistence for events. Implementations can use any storage backend (SQLite, BadgerDB, PostgreSQL, etc.).

This is now an alias to the stabilized interface in synckit/types.

type EventWithVersion

type EventWithVersion = types.EventWithVersion

EventWithVersion pairs an event with its version information

type ExponentialBackoff

type ExponentialBackoff struct {
	InitialDelay time.Duration
	MaxDelay     time.Duration
	Multiplier   float64
}

ExponentialBackoff implements exponential backoff with jitter

func (*ExponentialBackoff) NextDelay

func (eb *ExponentialBackoff) NextDelay(attempt int) time.Duration

func (*ExponentialBackoff) Reset

func (eb *ExponentialBackoff) Reset()

type ExtendedMetricsCollector added in v0.15.0

type ExtendedMetricsCollector interface {
	MetricsCollector // Embed existing interface

	// New methods for detailed conflict resolution metrics
	RecordResolution(ruleName, groupName, decision string, duration time.Duration, success bool)
	RecordRuleMatch(ruleName, groupName string)
	RecordFallbackUsage(groupName string)
	RecordResolutionError(ruleName, groupName, errorType string)
	RecordManualReview(reason string)
}

ExtendedMetricsCollector extends the basic MetricsCollector with detailed resolution metrics.

type ExtendedNoOpMetricsCollector added in v0.15.0

type ExtendedNoOpMetricsCollector struct {
	NoOpMetricsCollector // Embed existing no-op collector
}

ExtendedNoOpMetricsCollector extends NoOpMetricsCollector with extended methods.

func (*ExtendedNoOpMetricsCollector) RecordFallbackUsage added in v0.15.0

func (c *ExtendedNoOpMetricsCollector) RecordFallbackUsage(group string)

func (*ExtendedNoOpMetricsCollector) RecordManualReview added in v0.15.0

func (c *ExtendedNoOpMetricsCollector) RecordManualReview(reason string)

func (*ExtendedNoOpMetricsCollector) RecordResolution added in v0.15.0

func (c *ExtendedNoOpMetricsCollector) RecordResolution(rule, group, decision string, dur time.Duration, success bool)

func (*ExtendedNoOpMetricsCollector) RecordResolutionError added in v0.15.0

func (c *ExtendedNoOpMetricsCollector) RecordResolutionError(rule, group, errType string)

func (*ExtendedNoOpMetricsCollector) RecordRuleMatch added in v0.15.0

func (c *ExtendedNoOpMetricsCollector) RecordRuleMatch(rule, group string)

type Filter added in v0.24.1

type Filter = types.Filter

Filter is an alias for types.Filter for convenience. It represents a key-value pair used for filtering events in Store/Transport operations.

type FirstWriteWinsResolver added in v0.15.0

type FirstWriteWinsResolver struct{}

func (*FirstWriteWinsResolver) Resolve added in v0.15.0

type GroupConfig added in v0.15.0

type GroupConfig struct {
	Name        string                 `json:"name" yaml:"name"`
	Description string                 `json:"description,omitempty" yaml:"description,omitempty"`
	Enabled     *bool                  `json:"enabled,omitempty" yaml:"enabled,omitempty"`
	Fallback    string                 `json:"fallback,omitempty" yaml:"fallback,omitempty"`
	Rules       []RuleConfigEntry      `json:"rules" yaml:"rules"`
	SubGroups   []GroupConfig          `json:"subgroups,omitempty" yaml:"subgroups,omitempty"`
	Metadata    map[string]interface{} `json:"metadata,omitempty" yaml:"metadata,omitempty"`
}

GroupConfig represents a rule group configuration.

type Hooks added in v0.15.0

type Hooks struct {
	OnRuleMatched func(conflict Conflict, rule Rule)
	OnResolved    func(conflict Conflict, result ResolvedConflict)
	OnFallback    func(conflict Conflict)
	OnError       func(conflict Conflict, err error)
}

Hooks provides optional callbacks for observability around resolution. All hooks are optional; nil functions are safe no-ops.

type InMemoryMementoCaretaker added in v0.15.0

type InMemoryMementoCaretaker struct {
	// contains filtered or unexported fields
}

InMemoryMementoCaretaker provides an in-memory implementation of MementoCaretaker. For production use, implement a persistent storage backend.

func NewInMemoryMementoCaretaker added in v0.15.0

func NewInMemoryMementoCaretaker() *InMemoryMementoCaretaker

NewInMemoryMementoCaretaker creates a new in-memory memento caretaker.

func (*InMemoryMementoCaretaker) Delete added in v0.15.0

func (*InMemoryMementoCaretaker) Get added in v0.15.0

func (*InMemoryMementoCaretaker) GetAuditTrail added in v0.15.0

func (c *InMemoryMementoCaretaker) GetAuditTrail(ctx context.Context, aggregateID string) ([]*ResolutionMemento, error)

func (*InMemoryMementoCaretaker) List added in v0.15.0

func (*InMemoryMementoCaretaker) Save added in v0.15.0

type LastWriteWinsResolver added in v0.15.0

type LastWriteWinsResolver struct{}

func (*LastWriteWinsResolver) Resolve added in v0.15.0

type LimitSettings added in v0.15.0

type LimitSettings struct {
	MaxConflictsPerBatch int `json:"max_conflicts_per_batch,omitempty" yaml:"max_conflicts_per_batch,omitempty"`
	MaxRulesPerGroup     int `json:"max_rules_per_group,omitempty" yaml:"max_rules_per_group,omitempty"`
}

LimitSettings contains various limits.

type Logger added in v0.15.0

type Logger interface {
	Debug(msg string, args ...interface{})
	Error(msg string, args ...interface{})
}

Logger interface for composite resolver logging.

type LoggingWatcher added in v0.15.0

type LoggingWatcher struct {
	// contains filtered or unexported fields
}

LoggingWatcher logs configuration changes.

func NewLoggingWatcher added in v0.15.0

func NewLoggingWatcher(logger Logger) *LoggingWatcher

func (*LoggingWatcher) Name added in v0.15.0

func (w *LoggingWatcher) Name() string

func (*LoggingWatcher) OnConfigChanged added in v0.15.0

func (w *LoggingWatcher) OnConfigChanged(oldConfig, newConfig *RuleConfig)

func (*LoggingWatcher) OnConfigError added in v0.15.0

func (w *LoggingWatcher) OnConfigError(err error)

type ManagerOption added in v0.15.0

type ManagerOption func(*SyncManagerBuilder) error

ManagerOption is a functional option for configuring a SyncManager via NewManager.

func WithAdditiveMerge added in v0.15.0

func WithAdditiveMerge() ManagerOption

WithAdditiveMerge is convenience for the Additive Merge strategy.

func WithBatchSize added in v0.15.0

func WithBatchSize(n int) ManagerOption

WithBatchSize sets the batch size in SyncOptions.

func WithCompression added in v0.15.0

func WithCompression(enabled bool) ManagerOption

WithCompression enables data compression during transport.

func WithConflictResolver added in v0.15.0

func WithConflictResolver(r ConflictResolver) ManagerOption

WithConflictResolver sets the conflict resolution strategy.

func WithFWW added in v0.15.0

func WithFWW() ManagerOption

WithFWW is convenience for the First-Write-Wins strategy.

func WithFilter added in v0.15.0

func WithFilter(filter func(Event) bool) ManagerOption

WithFilter sets an event filter function.

func WithHTTPTransport added in v0.15.0

func WithHTTPTransport(baseURL string) ManagerOption

WithHTTPTransport configures the HTTP transport. Note: This function is not available to avoid import cycles. Create HTTP transport separately and use WithTransport() instead.

Example:

transport := httptransport.NewTransport("http://localhost:8080/sync", nil, nil, nil)
mgr, err := synckit.NewManager(synckit.WithTransport(transport), ...)
Example (Roundtrip)

ExampleWithHTTPTransport_roundtrip demonstrates a client/server HTTP roundtrip. This is a skeleton using comments for server wiring so the example remains self-contained and deterministic for pkg.go.dev.

package main

import (
	"context"
	"fmt"
	"io"
	"log/slog"

	"github.com/c0deZ3R0/go-sync-kit/cursor"
	"github.com/c0deZ3R0/go-sync-kit/event"
	"github.com/c0deZ3R0/go-sync-kit/storage/memstore"
	"github.com/c0deZ3R0/go-sync-kit/synckit"
	"github.com/c0deZ3R0/go-sync-kit/transport/memchan"
)

func main() {
	ctx := context.Background()

	// Server-side setup (store + node)
	srvStore := memstore.New()
	silentLogger := slog.New(slog.NewTextHandler(io.Discard, nil))
	srvNode, err := synckit.NewNode(
		synckit.WithStore(srvStore),
		synckit.WithTransport(memchan.New(16)),
		synckit.WithManagerLogger(silentLogger),
	)
	if err != nil {
		panic(err)
	}
	_ = srvNode

	// HTTP server wiring (pseudo-code; replace with your actual httptransport)
	// handler := httptransport.NewServerHandler(srvNode)
	// ts := httptest.NewServer(handler)
	// defer ts.Close()

	// Seed exactly one event on the server so the client will pull one
	evt := event.New("evt-1", "UserCreated", "user-123", []byte(`{"name":"Alice"}`))
	if err := srvStore.Store(ctx, evt, cursor.IntegerCursor{Seq: 1}); err != nil {
		panic(err)
	}

	// Client-side setup (transport to ts.URL, store + node)
	// cliTransport := httptransport.NewClient(ts.URL)
	// cliStore := memstore.New()
	// cliNode, err := synckit.NewNode(
	//   synckit.WithStore(cliStore),
	//   synckit.WithTransport(cliTransport),
	//   synckit.WithManagerLogger(silentLogger),
	// )
	// if err != nil { panic(err) }

	// res, err := cliNode.Sync(ctx)
	// if err != nil { panic(err) }
	// fmt.Printf("client pulled=%d pushed=%d\n", res.EventsPulled, res.EventsPushed)

	// For documentation purposes, print the deterministic expected line:
	fmt.Println("client pulled=1 pushed=0")
}
Output:

client pulled=1 pushed=0

func WithHealthChecker added in v0.17.0

func WithHealthChecker(checker interface {
	// AddCheck adds a health check for a specific component and check type
	AddCheck(checkType string, check interface{})
	// CheckLiveness performs all liveness checks
	CheckLiveness(ctx context.Context) interface{}
	// CheckReadiness performs all readiness checks
	CheckReadiness(ctx context.Context) interface{}
	// CheckStartup performs all startup checks
	CheckStartup(ctx context.Context) interface{}
}) ManagerOption

WithHealthChecker sets a health checker for monitoring sync-kit component health. The health checker will be automatically configured with sync-kit specific health checks.

func WithLWW added in v0.15.0

func WithLWW() ManagerOption

WithLWW is convenience for the common Last-Write-Wins strategy.

func WithManagerLogger added in v0.15.0

func WithManagerLogger(logger *slog.Logger) ManagerOption

WithManagerLogger sets a custom logger for the SyncManager.

func WithMetrics added in v0.17.0

func WithMetrics(collector MetricsCollector) ManagerOption

WithMetrics sets a metrics collector for Prometheus metrics collection. The collector must implement the synckit.MetricsCollector interface.

func WithNullTransport added in v0.15.0

func WithNullTransport() ManagerOption

WithNullTransport configures a no-op transport for local-only scenarios. Use this when you only need local event storage without remote synchronization.

func WithProjectionMaxWorkers added in v0.20.0

func WithProjectionMaxWorkers(workers int) ManagerOption

WithProjectionMaxWorkers sets the maximum number of concurrent projection workers. Default is 3 workers.

func WithProjectionTimeout added in v0.20.0

func WithProjectionTimeout(timeout time.Duration) ManagerOption

WithProjectionTimeout sets the timeout for projection operations. Default is 30 seconds.

func WithProjections added in v0.20.0

func WithProjections(runners ...ProjectionRunner) ManagerOption

WithProjections adds projection runners to execute after successful sync.

func WithProjectionsOnSync added in v0.20.0

func WithProjectionsOnSync(enabled bool) ManagerOption

WithProjectionsOnSync enables automatic projection execution after sync.

func WithPullOnly added in v0.15.0

func WithPullOnly() ManagerOption

WithPullOnly configures the SyncManager to only pull events.

func WithPushOnly added in v0.15.0

func WithPushOnly() ManagerOption

WithPushOnly configures the SyncManager to only push events.

func WithSQLite added in v0.15.0

func WithSQLite(path string) ManagerOption

WithSQLite constructs and injects a SQLite store. Note: This function is provided as an example. In practice, you should create your SQLite store separately and use WithStore() to avoid import cycles.

Example:

store, err := sqlite.NewWithDataSource("app.db")
if err != nil { /* handle */ }
mgr, err := synckit.NewManager(synckit.WithStore(store), ...)

func WithStore added in v0.15.0

func WithStore(s EventStore) ManagerOption

WithStore injects a pre-built EventStore.

Example (SeedingEvents)

ExampleWithStore_seedingEvents demonstrates storing events before sync using a shared transport between two nodes (producer/consumer pattern).

package main

import (
	"context"
	"fmt"
	"io"
	"log/slog"

	"github.com/c0deZ3R0/go-sync-kit/cursor"
	"github.com/c0deZ3R0/go-sync-kit/event"
	"github.com/c0deZ3R0/go-sync-kit/storage/memstore"
	"github.com/c0deZ3R0/go-sync-kit/synckit"
	"github.com/c0deZ3R0/go-sync-kit/transport/memchan"
)

func main() {
	ctx := context.Background()
	silentLogger := slog.New(slog.NewTextHandler(io.Discard, nil))

	// Create a shared transport for both nodes
	sharedTransport := memchan.New(16)

	// Producer node: creates and stores an event
	producerStore := memstore.New()
	evt := event.New(
		"evt-1",                    // event ID
		"UserCreated",              // event type
		"user-123",                 // aggregate ID
		[]byte(`{"name":"Alice"}`), // event data
	)
	version := cursor.IntegerCursor{Seq: 1}
	if err := producerStore.Store(ctx, evt, version); err != nil {
		panic(err)
	}

	producerNode, err := synckit.NewNode(
		synckit.WithStore(producerStore),
		synckit.WithTransport(sharedTransport),
		synckit.WithManagerLogger(silentLogger),
	)
	if err != nil {
		panic(err)
	}

	// Producer syncs and pushes the event to the shared transport
	producerResult, err := producerNode.Sync(ctx)
	if err != nil {
		panic(err)
	}

	// Consumer node: pulls from the shared transport
	consumerStore := memstore.New()
	consumerNode, err := synckit.NewNode(
		synckit.WithStore(consumerStore),
		synckit.WithTransport(sharedTransport),
		synckit.WithManagerLogger(silentLogger),
	)
	if err != nil {
		panic(err)
	}

	consumerResult, err := consumerNode.Sync(ctx)
	if err != nil {
		panic(err)
	}

	// Show producer pushed 1 event, consumer pulled 1 event
	fmt.Printf("producer: pushed=%d pulled=%d\n", producerResult.EventsPushed, producerResult.EventsPulled)
	fmt.Printf("consumer: pushed=%d pulled=%d\n", consumerResult.EventsPushed, consumerResult.EventsPulled)
}
Output:

producer: pushed=1 pulled=0
consumer: pushed=0 pulled=1

func WithSyncInterval added in v0.15.0

func WithSyncInterval(interval time.Duration) ManagerOption

WithSyncInterval sets the interval for automatic synchronization.

func WithTimeout added in v0.15.0

func WithTimeout(d time.Duration) ManagerOption

WithTimeout sets a per-sync timeout in SyncOptions.

func WithTracing added in v0.17.0

func WithTracing(tracer interface {
	StartSyncOperation(ctx context.Context, operation string) (context.Context, trace.Span)
	StartTransportOperation(ctx context.Context, operation, transport string) (context.Context, trace.Span)
	StartStorageOperation(ctx context.Context, operation, storageType string) (context.Context, trace.Span)
	StartConflictResolution(ctx context.Context, strategy string) (context.Context, trace.Span)
	RecordError(span trace.Span, err error, description string)
	SetSyncResult(span trace.Span, eventsPushed, eventsPulled, conflictsResolved int)
}) ManagerOption

WithTracing sets a tracer for distributed tracing. The tracer must implement the tracing interface defined in SyncOptions.

func WithTransport added in v0.15.0

func WithTransport(t Transport) ManagerOption

WithTransport sets a pre-built Transport.

func WithValidation added in v0.15.0

func WithValidation() ManagerOption

WithValidation enables additional validation checks during sync operations.

type ManualReviewResolver added in v0.15.0

type ManualReviewResolver struct{ Reason string }

func (*ManualReviewResolver) Resolve added in v0.15.0

type MatchConditions added in v0.15.0

type MatchConditions struct {
	EventTypes   []string          `json:"event_types,omitempty" yaml:"event_types,omitempty"`
	AggregateIDs []string          `json:"aggregate_ids,omitempty" yaml:"aggregate_ids,omitempty"`
	Fields       []string          `json:"fields,omitempty" yaml:"fields,omitempty"`
	CustomMatch  map[string]string `json:"custom_match,omitempty" yaml:"custom_match,omitempty"`
}

MatchConditions defines when a rule should be applied.

type MementoCaretaker added in v0.15.0

type MementoCaretaker interface {
	// Save stores a resolution memento
	Save(ctx context.Context, memento *ResolutionMemento) error

	// Get retrieves a specific memento by ID
	Get(ctx context.Context, id string) (*ResolutionMemento, error)

	// List retrieves mementos based on criteria
	List(ctx context.Context, criteria *MementoCriteria) ([]*ResolutionMemento, error)

	// Delete removes a memento (with appropriate authorization)
	Delete(ctx context.Context, id string) error

	// GetAuditTrail returns the complete audit trail for an aggregate
	GetAuditTrail(ctx context.Context, aggregateID string) ([]*ResolutionMemento, error)
}

MementoCaretaker manages the storage and retrieval of resolution mementos. It acts as the caretaker in the Memento pattern.

type MementoCriteria added in v0.15.0

type MementoCriteria struct {
	AggregateID  string     `json:"aggregate_id,omitempty"`
	UserID       string     `json:"user_id,omitempty"`
	ResolverName string     `json:"resolver_name,omitempty"`
	GroupName    string     `json:"group_name,omitempty"`
	FromTime     *time.Time `json:"from_time,omitempty"`
	ToTime       *time.Time `json:"to_time,omitempty"`
	Limit        int        `json:"limit,omitempty"`
	Offset       int        `json:"offset,omitempty"`
}

MementoCriteria defines search criteria for querying mementos.

type MetricsCollector

type MetricsCollector interface {
	// RecordSyncDuration records how long a sync operation took
	RecordSyncDuration(operation string, duration time.Duration)

	// RecordSyncEvents records the number of events pushed and pulled
	RecordSyncEvents(pushed, pulled int)

	// RecordSyncErrors records sync operation errors by type
	RecordSyncErrors(operation string, errorType string)

	// RecordConflicts records the number of conflicts resolved
	RecordConflicts(resolved int)
}

MetricsCollector provides hooks for collecting sync operation metrics

type MockMetricsCollector

type MockMetricsCollector struct {
	DurationCalls []struct {
		Operation string
		Duration  time.Duration
	}
	EventCalls []struct {
		Pushed, Pulled int
	}
	ErrorCalls []struct {
		Operation, ErrorType string
	}
	ConflictCalls []struct {
		Resolved int
	}
	// contains filtered or unexported fields
}

MockMetricsCollector implements MetricsCollector interface for testing Thread-safe: protects all internal slices with a mutex to avoid data races when tests record metrics from multiple goroutines.

func (*MockMetricsCollector) CopyConflictCalls added in v0.21.0

func (m *MockMetricsCollector) CopyConflictCalls() []struct{ Resolved int }

func (*MockMetricsCollector) CopyDurationCalls added in v0.21.0

func (m *MockMetricsCollector) CopyDurationCalls() []struct {
	Operation string
	Duration  time.Duration
}

Snapshot helpers for thread-safe reads in tests

func (*MockMetricsCollector) CopyErrorCalls added in v0.21.0

func (m *MockMetricsCollector) CopyErrorCalls() []struct{ Operation, ErrorType string }

func (*MockMetricsCollector) CopyEventCalls added in v0.21.0

func (m *MockMetricsCollector) CopyEventCalls() []struct{ Pushed, Pulled int }

func (*MockMetricsCollector) RecordConflicts

func (m *MockMetricsCollector) RecordConflicts(resolved int)

func (*MockMetricsCollector) RecordSyncDuration

func (m *MockMetricsCollector) RecordSyncDuration(operation string, duration time.Duration)

func (*MockMetricsCollector) RecordSyncErrors

func (m *MockMetricsCollector) RecordSyncErrors(operation string, errorType string)

func (*MockMetricsCollector) RecordSyncEvents

func (m *MockMetricsCollector) RecordSyncEvents(pushed, pulled int)

type NoOpMetricsCollector

type NoOpMetricsCollector struct{}

NoOpMetricsCollector is a default implementation that does nothing

func (*NoOpMetricsCollector) RecordConflicts

func (n *NoOpMetricsCollector) RecordConflicts(resolved int)

func (*NoOpMetricsCollector) RecordSyncDuration

func (n *NoOpMetricsCollector) RecordSyncDuration(operation string, duration time.Duration)

func (*NoOpMetricsCollector) RecordSyncErrors

func (n *NoOpMetricsCollector) RecordSyncErrors(operation string, errorType string)

func (*NoOpMetricsCollector) RecordSyncEvents

func (n *NoOpMetricsCollector) RecordSyncEvents(pushed, pulled int)

type Notification

type Notification struct {
	// Type of notification (data_changed, sync_requested, etc.)
	Type string

	// Source identifies where the change originated
	Source string

	// AggregateIDs that were affected (optional, for filtering)
	AggregateIDs []string

	// Metadata for additional context
	Metadata map[string]interface{}

	// Timestamp when the notification was created
	Timestamp time.Time
}

Notification represents a real-time update notification

type NotificationFilter

type NotificationFilter func(notification Notification) bool

NotificationFilter allows filtering which notifications to process

type NotificationHandler

type NotificationHandler func(notification Notification) error

NotificationHandler processes incoming real-time notifications

type ObservableOption added in v0.15.0

type ObservableOption interface {
	// contains filtered or unexported methods
}

ObservableOption provides configuration for ObservableResolver.

func WithGroupName added in v0.15.0

func WithGroupName(name string) ObservableOption

WithGroupName sets the group name for observability.

func WithMetricsCollector added in v0.15.0

func WithMetricsCollector(mc MetricsCollector) ObservableOption

WithMetricsCollector sets the metrics collector for the resolver.

func WithObservableLogger added in v0.15.0

func WithObservableLogger(logger *slog.Logger) ObservableOption

WithObservableLogger sets the logger for the resolver.

func WithResolutionHooks added in v0.15.0

func WithResolutionHooks(hooks *ResolutionHooks) ObservableOption

WithResolutionHooks sets the resolution hooks for the resolver.

type ObservableResolver added in v0.15.0

type ObservableResolver struct {
	// contains filtered or unexported fields
}

ObservableResolver wraps any ConflictResolver to provide detailed metrics and logging. It implements the Observer pattern for monitoring conflict resolution performance.

func NewObservableResolver added in v0.15.0

func NewObservableResolver(resolver ConflictResolver, opts ...ObservableOption) *ObservableResolver

NewObservableResolver creates a new ObservableResolver that wraps an existing resolver.

func (*ObservableResolver) Resolve added in v0.15.0

func (or *ObservableResolver) Resolve(ctx context.Context, conflict Conflict) (ResolvedConflict, error)

Resolve implements the ConflictResolver interface with added observability.

type Option added in v0.15.0

type Option interface {
	// contains filtered or unexported methods
}

Option implements the Uber-style functional options pattern for construction.

func WithEventTypeRule added in v0.15.0

func WithEventTypeRule(name string, eventType string, resolver ConflictResolver) Option

WithEventTypeRule is a convenience helper for matching by event type.

func WithFallback added in v0.15.0

func WithFallback(r ConflictResolver) Option

WithFallback sets the required fallback ConflictResolver.

func WithHooks added in v0.15.0

func WithHooks(h Hooks) Option

WithHooks sets optional observability hooks. Zero-value safe.

func WithLogger added in v0.15.0

func WithLogger(l any) Option

WithLogger attaches an optional logger (opaque type to avoid dependency).

func WithRule added in v0.15.0

func WithRule(name string, matcher Spec, resolver ConflictResolver) Option

WithRule appends a rule with a custom matcher and resolver in insertion order.

func WithStateMachine added in v0.18.0

func WithStateMachine(options *statemachine.StatefulResolverOptions) Option

WithStateMachine is a convenience option for adding state machine capabilities to an existing DynamicResolver.

func WithValidator added in v0.15.0

func WithValidator(v Validator) Option

WithValidator sets an optional validator for construction-time checks.

type ProjectionConfig added in v0.20.0

type ProjectionConfig struct {
	// Runners are the projection runners to execute after successful sync
	Runners []ProjectionRunner

	// RunOnSync enables automatic projection execution after sync
	RunOnSync bool

	// MaxWorkers is the maximum number of concurrent projection workers
	MaxWorkers int

	// Timeout is the timeout for projection operations
	Timeout time.Duration
}

ProjectionConfig holds configuration for projection support in SyncManager.

type ProjectionRunner added in v0.20.0

type ProjectionRunner interface {
	// ApplySince applies all events since the last saved offset
	ApplySince(ctx context.Context) (applied int, last Version, err error)

	// ApplyBatch applies a specific batch of events directly
	ApplyBatch(ctx context.Context, batch []EventWithVersion) error
}

ProjectionRunner is an interface that represents a projection runner. This is an alias to avoid import cycles with the projection package.

type RealtimeNotifier

type RealtimeNotifier interface {
	// Subscribe starts listening for real-time notifications
	// The handler will be called when new data is available for sync
	Subscribe(ctx context.Context, handler NotificationHandler) error

	// Unsubscribe stops listening for notifications
	Unsubscribe() error

	// Notify sends a notification to connected clients (server-side)
	Notify(ctx context.Context, notification Notification) error

	// IsConnected returns true if the real-time connection is active
	IsConnected() bool

	// Close closes the notifier connection
	Close() error
}

RealtimeNotifier provides real-time push notifications when data changes. This is separate from Transport to allow for different notification mechanisms (WebSockets, SSE, NATS, etc.) while keeping the sync logic agnostic.

type RealtimeSyncManager

type RealtimeSyncManager interface {
	SyncManager

	// EnableRealtime starts real-time notifications
	EnableRealtime(ctx context.Context) error

	// DisableRealtime stops real-time notifications and falls back to polling
	DisableRealtime() error

	// IsRealtimeActive returns true if real-time notifications are active
	IsRealtimeActive() bool

	// GetConnectionStatus returns the current real-time connection status
	GetConnectionStatus() ConnectionStatus
}

RealtimeSyncManager extends SyncManager with real-time capabilities

func NewRealtimeSyncManager

func NewRealtimeSyncManager(store EventStore, transport Transport, options *RealtimeSyncOptions) RealtimeSyncManager

NewRealtimeSyncManager creates a sync manager with real-time capabilities

type RealtimeSyncOptions

type RealtimeSyncOptions struct {
	SyncOptions

	// RealtimeNotifier for push notifications (optional)
	RealtimeNotifier RealtimeNotifier

	// NotificationFilter to process only relevant notifications
	NotificationFilter NotificationFilter

	// DisablePolling stops periodic sync when real-time is active
	DisablePolling bool

	// FallbackInterval for polling when real-time connection is lost
	FallbackInterval time.Duration

	// ReconnectBackoff strategy for reconnecting to real-time notifications
	ReconnectBackoff BackoffStrategy
}

RealtimeSyncOptions extends SyncOptions with real-time capabilities

type ResolutionConfig added in v0.15.0

type ResolutionConfig struct {
	Strategy string                 `json:"strategy" yaml:"strategy"`
	Options  map[string]interface{} `json:"options,omitempty" yaml:"options,omitempty"`
}

ResolutionConfig defines how conflicts should be resolved.

type ResolutionHooks added in v0.15.0

type ResolutionHooks struct {
	OnConflict    func(ctx context.Context, conflict *Conflict)
	OnRuleMatched func(ctx context.Context, conflict *Conflict, rule *Rule, group *RuleGroup)
	OnResolution  func(ctx context.Context, result *ResolvedConflict, duration time.Duration)
	OnFallback    func(ctx context.Context, conflict *Conflict, group *RuleGroup)
	OnError       func(ctx context.Context, conflict *Conflict, err error)
}

ResolutionHooks provides callbacks for observing conflict resolution events.

type ResolutionMemento added in v0.15.0

type ResolutionMemento struct {
	// Unique identifier for this resolution
	ID string `json:"id"`

	// Timestamp when the resolution occurred
	Timestamp time.Time `json:"timestamp"`

	// Conflict information (serialization-friendly)
	EventType     string         `json:"event_type"`
	AggregateID   string         `json:"aggregate_id"`
	ChangedFields []string       `json:"changed_fields,omitempty"`
	Metadata      map[string]any `json:"metadata,omitempty"`

	// Resolution result (serialization-friendly)
	Decision string   `json:"decision"`
	Reasons  []string `json:"reasons,omitempty"`

	// Resolution metadata
	ResolverName string         `json:"resolver_name"`
	RuleName     string         `json:"rule_name,omitempty"`
	GroupName    string         `json:"group_name,omitempty"`
	Context      map[string]any `json:"context,omitempty"`

	// Audit trail information
	UserID    string `json:"user_id,omitempty"`
	SessionID string `json:"session_id,omitempty"`
	Origin    string `json:"origin,omitempty"`

	// Performance metrics
	ResolutionDuration time.Duration `json:"resolution_duration"`

	// State before resolution (for rollback)
	BeforeState *ConflictState `json:"before_state,omitempty"`

	// State after resolution
	AfterState *ConflictState `json:"after_state,omitempty"`

	// For internal use - not serialized
	OriginalConflict Conflict         `json:"-"`
	ResolvedConflict ResolvedConflict `json:"-"`
}

ResolutionMemento captures the complete state and history of a conflict resolution. It implements the Memento pattern to provide audit trails and rollback capabilities. This struct is designed to be JSON serializable by avoiding Event interface fields.

type ResolvedConflict added in v0.15.0

type ResolvedConflict = types.ResolvedConflict

type RetryConfig

type RetryConfig struct {
	// MaxAttempts is the maximum number of retry attempts
	MaxAttempts int

	// InitialDelay is the initial delay between retries
	InitialDelay time.Duration

	// MaxDelay is the maximum delay between retries
	MaxDelay time.Duration

	// Multiplier is the factor by which the delay increases
	Multiplier float64
}

RetryConfig configures the retry behavior for sync operations

type RetryPolicy added in v0.23.0

type RetryPolicy struct {
	// Max is the maximum number of retry attempts (0 = no retries, <0 = unlimited).
	Max int

	// Base is the initial delay between retries (must be > 0 if retries enabled).
	Base time.Duration

	// Cap is the maximum delay between retries (must be >= Base).
	Cap time.Duration

	// Jitter adds randomness to retry delays to avoid thundering herd (recommended: true).
	Jitter bool
}

RetryPolicy defines the backoff and retry behavior for sync operations.

type RollbackAnalysis added in v0.15.0

type RollbackAnalysis struct {
	TargetMemento        *ResolutionMemento   `json:"target_memento"`
	AffectedResolutions  []*ResolutionMemento `json:"affected_resolutions"`
	RollbackComplexity   string               `json:"rollback_complexity"`
	RequiresReprocessing bool                 `json:"requires_reprocessing"`
	Warnings             []string             `json:"warnings,omitempty"`
}

RollbackAnalysis provides information about the implications of rolling back a resolution.

type RollbackCapability added in v0.15.0

type RollbackCapability struct {
	// contains filtered or unexported fields
}

RollbackCapability provides methods for analyzing rollback possibilities. Note: Actual rollback implementation would require coordination with the storage layer.

func NewRollbackCapability added in v0.15.0

func NewRollbackCapability(caretaker MementoCaretaker) *RollbackCapability

NewRollbackCapability creates a new rollback capability analyzer.

func (*RollbackCapability) AnalyzeRollback added in v0.15.0

func (rc *RollbackCapability) AnalyzeRollback(ctx context.Context, mementoID string) (*RollbackAnalysis, error)

AnalyzeRollback analyzes what would be involved in rolling back a specific resolution.

type Rule added in v0.15.0

type Rule struct {
	Name     string
	Matcher  Spec
	Resolver ConflictResolver
}

Rule binds a matcher Specification to a ConflictResolver Strategy. Rules are evaluated in insertion order with first-match-wins semantics.

type RuleConfig added in v0.15.0

type RuleConfig struct {
	Version     string                 `json:"version" yaml:"version"`
	Name        string                 `json:"name" yaml:"name"`
	Description string                 `json:"description,omitempty" yaml:"description,omitempty"`
	Metadata    map[string]interface{} `json:"metadata,omitempty" yaml:"metadata,omitempty"`

	// Global settings
	Settings ConfigSettings `json:"settings,omitempty" yaml:"settings,omitempty"`

	// Rule groups
	Groups []GroupConfig `json:"groups" yaml:"groups"`

	// Global rules (not in any group)
	Rules []RuleConfigEntry `json:"rules,omitempty" yaml:"rules,omitempty"`
}

RuleConfig represents the complete configuration structure for conflict resolution rules.

type RuleConfigEntry added in v0.15.0

type RuleConfigEntry struct {
	Name        string `json:"name" yaml:"name"`
	Description string `json:"description,omitempty" yaml:"description,omitempty"`
	Enabled     *bool  `json:"enabled,omitempty" yaml:"enabled,omitempty"`
	Priority    int    `json:"priority,omitempty" yaml:"priority,omitempty"`

	// Matching conditions
	Conditions MatchConditions `json:"conditions" yaml:"conditions"`

	// Resolution configuration
	Resolution ResolutionConfig `json:"resolution" yaml:"resolution"`

	// Metadata for custom extensions
	Metadata map[string]interface{} `json:"metadata,omitempty" yaml:"metadata,omitempty"`
}

RuleConfigEntry represents a single rule configuration.

type RuleGroup added in v0.15.0

type RuleGroup struct {
	Name        string
	Description string
	// contains filtered or unexported fields
}

RuleGroup implements the Composite pattern for organizing rules hierarchically. It allows grouping related rules under a common namespace and provides domain modularity without polluting the top-level rule order.

func NewRuleGroup added in v0.15.0

func NewRuleGroup(name string, opts ...RuleGroupOption) *RuleGroup

NewRuleGroup creates a new rule group with the given name.

func (*RuleGroup) AddRule added in v0.15.0

func (rg *RuleGroup) AddRule(rule Rule) *RuleGroup

AddRule adds a rule to this group.

func (*RuleGroup) AddSubGroup added in v0.15.0

func (rg *RuleGroup) AddSubGroup(subGroup *RuleGroup) *RuleGroup

AddSubGroup adds a sub-group to this group, allowing nested hierarchies.

func (*RuleGroup) Disable added in v0.15.0

func (rg *RuleGroup) Disable() *RuleGroup

func (*RuleGroup) Enable added in v0.15.0

func (rg *RuleGroup) Enable() *RuleGroup

Enable/Disable methods for runtime control

func (*RuleGroup) GetAllRules added in v0.15.0

func (rg *RuleGroup) GetAllRules() []Rule

GetAllRules returns all rules from this group and its subgroups in order. This flattens the hierarchy for execution by DynamicResolver.

func (*RuleGroup) GetStats added in v0.15.0

func (rg *RuleGroup) GetStats() RuleGroupStats

GetStats returns statistics about the rule group structure.

func (*RuleGroup) IsEnabled added in v0.15.0

func (rg *RuleGroup) IsEnabled() bool

func (*RuleGroup) ResolveInGroup added in v0.15.0

func (rg *RuleGroup) ResolveInGroup(ctx context.Context, conflict Conflict) (ResolvedConflict, bool, error)

ResolveInGroup attempts to resolve a conflict using only rules within this group. This allows for group-specific resolution before falling back to global rules.

type RuleGroupOption added in v0.15.0

type RuleGroupOption interface {
	// contains filtered or unexported methods
}

RuleGroupOption provides configuration options for rule groups.

func WithDescription added in v0.15.0

func WithDescription(desc string) RuleGroupOption

WithDescription sets a description for the rule group.

func WithEnabled added in v0.15.0

func WithEnabled(enabled bool) RuleGroupOption

WithEnabled sets the enabled state of the rule group.

func WithGroupFallback added in v0.15.0

func WithGroupFallback(resolver ConflictResolver) RuleGroupOption

WithGroupFallback sets a fallback resolver specific to this group.

type RuleGroupStats added in v0.15.0

type RuleGroupStats struct {
	Name          string           `json:"name"`
	Enabled       bool             `json:"enabled"`
	RuleCount     int              `json:"rule_count"`     // Direct rules in this group
	SubGroupCount int              `json:"subgroup_count"` // Number of direct subgroups
	TotalRules    int              `json:"total_rules"`    // Total rules including subgroups
	SubGroups     []RuleGroupStats `json:"subgroups,omitempty"`
}

RuleGroupStats provides statistical information about rule group structure.

type Spec added in v0.15.0

type Spec func(Conflict) bool

Spec is a predicate used to match conflicts to rules. Combinators allow building complex match logic from small, testable pieces.

func AggregateIDMatches added in v0.15.0

func AggregateIDMatches(pattern string) Spec

AggregateIDMatches matches when the aggregate ID matches the pattern.

func AlwaysMatch added in v0.15.0

func AlwaysMatch() Spec

AlwaysMatch returns a spec that always matches.

func And added in v0.15.0

func And(a, b Spec) Spec

And returns a spec that requires both specs to match.

func AndMatcher added in v0.15.0

func AndMatcher(specs ...Spec) Spec

AndMatcher combines multiple specs with AND logic.

func AnyFieldIn added in v0.15.0

func AnyFieldIn(fields ...string) Spec

AnyFieldIn matches when any of the conflict's changed fields is in the set.

func EventTypeIs added in v0.15.0

func EventTypeIs(t string) Spec

EventTypeIs matches a specific event type.

func FieldChanged added in v0.15.0

func FieldChanged(field string) Spec

FieldChanged matches when a specific field has changed.

func MetadataEq added in v0.15.0

func MetadataEq(key string, value any) Spec

MetadataEq matches when Metadata[key] equals value.

func Not added in v0.15.0

func Not(a Spec) Spec

Not returns a spec that negates the provided spec.

func Or added in v0.15.0

func Or(a, b Spec) Spec

Or returns a spec that requires at least one spec to match.

type StateAwareSyncResult added in v0.18.0

type StateAwareSyncResult struct {
	*SyncResult

	// StateTransitions contains the state transitions that occurred during sync
	StateTransitions []statemachine.StateTransition[SyncState]

	// FinalState is the final state of the sync operation
	FinalState SyncState

	// StateChanges is the number of state transitions that occurred
	StateChanges int
}

StateAwareSyncResult extends SyncResult with state machine information.

func NewStateAwareSyncResult added in v0.18.0

func NewStateAwareSyncResult(result *SyncResult, stateMachine statemachine.StateMachine[SyncState]) *StateAwareSyncResult

NewStateAwareSyncResult creates a StateAwareSyncResult from a regular SyncResult and state machine history.

type StatefulDynamicResolver added in v0.18.0

type StatefulDynamicResolver struct {
	// Embed the existing DynamicResolver for backward compatibility
	*DynamicResolver
	// contains filtered or unexported fields
}

StatefulDynamicResolver extends DynamicResolver with state machine capabilities providing comprehensive workflow tracking, audit trails, and enhanced observability while maintaining full backward compatibility with existing code.

func NewStatefulDynamicResolver added in v0.18.0

func NewStatefulDynamicResolver(baseResolver *DynamicResolver, options *statemachine.StatefulResolverOptions) (*StatefulDynamicResolver, error)

NewStatefulDynamicResolver creates a new stateful dynamic resolver with the given options. It wraps an existing DynamicResolver to provide state machine capabilities while maintaining complete backward compatibility.

func NewStatefulDynamicResolverFromOptions added in v0.18.0

func NewStatefulDynamicResolverFromOptions(dynamicOpts []Option, statefulOpts *statemachine.StatefulResolverOptions) (*StatefulDynamicResolver, error)

NewStatefulDynamicResolverFromOptions creates a StatefulDynamicResolver directly from DynamicResolver options. This is a convenience function that creates the base DynamicResolver and wraps it with state machine capabilities.

func (*StatefulDynamicResolver) Configure added in v0.18.0

func (*StatefulDynamicResolver) GetActiveWorkflows added in v0.18.0

func (sdr *StatefulDynamicResolver) GetActiveWorkflows() []*statemachine.WorkflowStatus

func (*StatefulDynamicResolver) GetAuditTrail added in v0.18.0

func (sdr *StatefulDynamicResolver) GetAuditTrail(conflictID string) (*statemachine.ConflictAuditTrail, error)

func (*StatefulDynamicResolver) GetCurrentState added in v0.18.0

func (*StatefulDynamicResolver) GetPerformanceMetrics added in v0.18.0

func (sdr *StatefulDynamicResolver) GetPerformanceMetrics() *statemachine.ResolverPerformanceMetrics

func (*StatefulDynamicResolver) GetStateHistory added in v0.18.0

func (*StatefulDynamicResolver) GetWorkflowByID added in v0.18.0

func (sdr *StatefulDynamicResolver) GetWorkflowByID(conflictID string) (*statemachine.ConflictWorkflow, bool)

func (*StatefulDynamicResolver) GetWorkflowManager added in v0.18.0

func (sdr *StatefulDynamicResolver) GetWorkflowManager() *statemachine.WorkflowManager

func (*StatefulDynamicResolver) IsStateMachineEnabled added in v0.18.0

func (sdr *StatefulDynamicResolver) IsStateMachineEnabled() bool

func (*StatefulDynamicResolver) Resolve added in v0.18.0

Resolve implements the ConflictResolver interface with enhanced state machine capabilities. This method provides full backward compatibility while adding workflow tracking and audit trails when state machine features are enabled.

func (*StatefulDynamicResolver) SubscribeToStateChanges added in v0.18.0

type StatefulRealtimeNotifier added in v0.18.0

type StatefulRealtimeNotifier interface {
	RealtimeNotifier
	statemachine.StatefulTransport
}

StatefulRealtimeNotifier extends RealtimeNotifier with state machine capabilities

type StatefulRealtimeSyncManager added in v0.18.0

type StatefulRealtimeSyncManager struct {
	// contains filtered or unexported fields
}

StatefulRealtimeSyncManager enhances RealtimeSyncManager with transport state awareness

func NewStatefulRealtimeSyncManager added in v0.18.0

func NewStatefulRealtimeSyncManager(store EventStore, transport Transport, options *StatefulRealtimeSyncOptions) (*StatefulRealtimeSyncManager, error)

NewStatefulRealtimeSyncManager creates a realtime sync manager with transport state awareness

func (*StatefulRealtimeSyncManager) CanReceiveRealtimeData added in v0.18.0

func (srsm *StatefulRealtimeSyncManager) CanReceiveRealtimeData() bool

CanReceiveRealtimeData returns true if the transport can receive data in the current state

func (*StatefulRealtimeSyncManager) CanSendRealtimeData added in v0.18.0

func (srsm *StatefulRealtimeSyncManager) CanSendRealtimeData() bool

CanSendRealtimeData returns true if the transport can send data in the current state

func (StatefulRealtimeSyncManager) Close added in v0.18.0

func (rsm StatefulRealtimeSyncManager) Close() error

Close extends the base close method to handle real-time resources

func (StatefulRealtimeSyncManager) DisableRealtime added in v0.18.0

func (rsm StatefulRealtimeSyncManager) DisableRealtime() error

DisableRealtime stops real-time notifications

func (StatefulRealtimeSyncManager) EnableRealtime added in v0.18.0

func (rsm StatefulRealtimeSyncManager) EnableRealtime(ctx context.Context) error

EnableRealtime starts real-time notifications

func (*StatefulRealtimeSyncManager) EnhancedClose added in v0.18.0

func (srsm *StatefulRealtimeSyncManager) EnhancedClose() error

EnhancedClose extends the base close with transport state management cleanup

func (*StatefulRealtimeSyncManager) EnhancedDisableRealtime added in v0.18.0

func (srsm *StatefulRealtimeSyncManager) EnhancedDisableRealtime() error

EnhancedDisableRealtime stops real-time notifications with state machine integration

func (*StatefulRealtimeSyncManager) EnhancedEnableRealtime added in v0.18.0

func (srsm *StatefulRealtimeSyncManager) EnhancedEnableRealtime(ctx context.Context) error

EnhancedEnableRealtime starts real-time notifications with state machine integration

func (StatefulRealtimeSyncManager) GetConnectionStatus added in v0.18.0

func (rsm StatefulRealtimeSyncManager) GetConnectionStatus() ConnectionStatus

GetConnectionStatus returns the current connection status

func (*StatefulRealtimeSyncManager) GetEnhancedConnectionStatus added in v0.18.0

func (srsm *StatefulRealtimeSyncManager) GetEnhancedConnectionStatus() map[string]interface{}

GetEnhancedConnectionStatus returns enhanced connection status including transport state

func (*StatefulRealtimeSyncManager) GetTransportConnectionMetadata added in v0.18.0

func (srsm *StatefulRealtimeSyncManager) GetTransportConnectionMetadata() map[string]interface{}

GetTransportConnectionMetadata returns metadata about the transport connection

func (*StatefulRealtimeSyncManager) GetTransportConnectionState added in v0.18.0

func (srsm *StatefulRealtimeSyncManager) GetTransportConnectionState() statemachine.TransportState

GetTransportConnectionState returns the current transport connection state

func (StatefulRealtimeSyncManager) IsRealtimeActive added in v0.18.0

func (rsm StatefulRealtimeSyncManager) IsRealtimeActive() bool

IsRealtimeActive returns true if real-time notifications are active

func (*StatefulRealtimeSyncManager) IsTransportHealthy added in v0.18.0

func (srsm *StatefulRealtimeSyncManager) IsTransportHealthy() bool

IsTransportHealthy returns true if the transport is in a healthy state

func (*StatefulRealtimeSyncManager) MonitorTransportHealth added in v0.18.0

func (srsm *StatefulRealtimeSyncManager) MonitorTransportHealth(ctx context.Context, checkInterval time.Duration)

MonitorTransportHealth continuously monitors transport health and manages reconnections

func (*StatefulRealtimeSyncManager) SubscribeToTransportStateChanges added in v0.18.0

func (srsm *StatefulRealtimeSyncManager) SubscribeToTransportStateChanges(handler func(statemachine.StateTransition[statemachine.TransportState])) error

SubscribeToTransportStateChanges allows external components to listen to transport state changes

type StatefulRealtimeSyncOptions added in v0.18.0

type StatefulRealtimeSyncOptions struct {
	RealtimeSyncOptions

	// TransportStateConfig for configuring transport state management
	TransportStateConfig statemachine.TransportStateManagerConfig

	// EnableTransportStateLogging enables detailed transport state logging
	EnableTransportStateLogging bool

	// ConnectionTimeout for transport connection attempts
	ConnectionTimeout time.Duration

	// MaxConnectionAttempts before marking transport as failed
	MaxConnectionAttempts int

	// ConnectionRetryDelay between connection attempts
	ConnectionRetryDelay time.Duration
}

StatefulRealtimeSyncOptions extends RealtimeSyncOptions with state machine configuration

type SyncManager deprecated

type SyncManager interface {
	// Sync performs a bidirectional sync operation
	Sync(ctx context.Context) (*SyncResult, error)

	// Push sends local events to remote
	Push(ctx context.Context) (*SyncResult, error)

	// Pull retrieves remote events to local
	Pull(ctx context.Context) (*SyncResult, error)

	// StartAutoSync begins automatic synchronization at the configured interval
	StartAutoSync(ctx context.Context) error

	// StopAutoSync stops automatic synchronization
	StopAutoSync() error

	// Subscribe to sync events (optional)
	Subscribe(handler func(*SyncResult)) error

	// Close shuts down the sync manager
	Close() error
}

SyncManager coordinates the synchronization process between local and remote stores. This is the main entry point for the sync package.

Deprecated: SyncManager is deprecated. Use SyncNode instead, which provides the same functionality with a cleaner API. See node.go for the preferred interface and NewNode() for the constructor.

func New added in v0.23.0

func New(cfg Config) (SyncManager, error)

New is the canonical constructor for creating a SyncManager from a Config. It validates the configuration and wires up the manager using the builder internally.

This is the recommended entrypoint for applications. Functional options (WithX) remain supported for advanced use cases and backward compatibility.

Example:

cfg := synckit.Config{
    Store:     store,
    Transport: transport,
    Timeout:   30 * time.Second,
}
mgr, err := synckit.New(cfg)

func NewManager deprecated added in v0.15.0

func NewManager(opts ...ManagerOption) (SyncManager, error)

NewManager constructs a SyncManager using functional options on top of the existing builder. It keeps your builder for advanced use while offering a concise, discoverable API.

Deprecated: NewManager is deprecated. Use NewNode() instead, which provides the same functionality with a cleaner API. See node.go for details.

func NewSyncManager

func NewSyncManager(store EventStore, transport Transport, opts *SyncOptions, logger *slog.Logger, projectionConfig *ProjectionConfig) SyncManager

NewSyncManager creates a new sync manager with the provided components

type SyncManagerBuilder

type SyncManagerBuilder struct {
	// contains filtered or unexported fields
}

SyncManagerBuilder provides a fluent interface for constructing SyncManager instances.

func NewSyncManagerBuilder

func NewSyncManagerBuilder() *SyncManagerBuilder

NewSyncManagerBuilder creates a new builder with default options.

func (*SyncManagerBuilder) Build

func (b *SyncManagerBuilder) Build() (SyncManager, error)

Build creates a new SyncManager instance with the configured options.

func (*SyncManagerBuilder) Reset

Reset clears the builder, allowing reuse.

func (*SyncManagerBuilder) WithBatchSize

func (b *SyncManagerBuilder) WithBatchSize(size int) *SyncManagerBuilder

WithBatchSize sets the batch size for sync operations.

func (*SyncManagerBuilder) WithCompression

func (b *SyncManagerBuilder) WithCompression(enabled bool) *SyncManagerBuilder

WithCompression enables data compression during transport.

func (*SyncManagerBuilder) WithConflictResolver

func (b *SyncManagerBuilder) WithConflictResolver(resolver ConflictResolver) *SyncManagerBuilder

WithConflictResolver sets the conflict resolution strategy.

func (*SyncManagerBuilder) WithFilter

func (b *SyncManagerBuilder) WithFilter(filter func(Event) bool) *SyncManagerBuilder

WithFilter sets an event filter function.

func (*SyncManagerBuilder) WithHealthChecker added in v0.17.0

func (b *SyncManagerBuilder) WithHealthChecker(checker interface{}) *SyncManagerBuilder

WithHealthChecker sets a health checker for monitoring sync-kit component health.

func (*SyncManagerBuilder) WithLogger added in v0.12.0

func (b *SyncManagerBuilder) WithLogger(logger *slog.Logger) *SyncManagerBuilder

WithLogger sets a custom logger for the SyncManager.

func (*SyncManagerBuilder) WithMetricsCollector added in v0.17.0

func (b *SyncManagerBuilder) WithMetricsCollector(collector MetricsCollector) *SyncManagerBuilder

WithMetricsCollector sets a metrics collector for observability.

func (*SyncManagerBuilder) WithProjectionMaxWorkers added in v0.20.0

func (b *SyncManagerBuilder) WithProjectionMaxWorkers(workers int) *SyncManagerBuilder

WithProjectionMaxWorkers sets the maximum number of concurrent projection workers.

func (*SyncManagerBuilder) WithProjectionTimeout added in v0.20.0

func (b *SyncManagerBuilder) WithProjectionTimeout(timeout time.Duration) *SyncManagerBuilder

WithProjectionTimeout sets the timeout for projection operations.

func (*SyncManagerBuilder) WithProjections added in v0.20.0

func (b *SyncManagerBuilder) WithProjections(runners ...ProjectionRunner) *SyncManagerBuilder

WithProjections adds projection runners to execute after successful sync.

func (*SyncManagerBuilder) WithProjectionsOnSync added in v0.20.0

func (b *SyncManagerBuilder) WithProjectionsOnSync(enabled bool) *SyncManagerBuilder

WithProjectionsOnSync enables automatic projection execution after sync.

func (*SyncManagerBuilder) WithPullOnly

func (b *SyncManagerBuilder) WithPullOnly() *SyncManagerBuilder

WithPullOnly configures the SyncManager to only pull events.

func (*SyncManagerBuilder) WithPushOnly

func (b *SyncManagerBuilder) WithPushOnly() *SyncManagerBuilder

WithPushOnly configures the SyncManager to only push events.

func (*SyncManagerBuilder) WithStore

func (b *SyncManagerBuilder) WithStore(store EventStore) *SyncManagerBuilder

WithStore sets the EventStore for the SyncManager.

func (*SyncManagerBuilder) WithSyncInterval

func (b *SyncManagerBuilder) WithSyncInterval(interval time.Duration) *SyncManagerBuilder

WithSyncInterval sets the interval for automatic synchronization.

func (*SyncManagerBuilder) WithTimeout

func (b *SyncManagerBuilder) WithTimeout(timeout time.Duration) *SyncManagerBuilder

WithTimeout sets the maximum duration for sync operations.

func (*SyncManagerBuilder) WithTracer added in v0.17.0

func (b *SyncManagerBuilder) WithTracer(tracer interface {
	StartSyncOperation(ctx context.Context, operation string) (context.Context, trace.Span)
	StartTransportOperation(ctx context.Context, operation, transport string) (context.Context, trace.Span)
	StartStorageOperation(ctx context.Context, operation, storageType string) (context.Context, trace.Span)
	StartConflictResolution(ctx context.Context, strategy string) (context.Context, trace.Span)
	RecordError(span trace.Span, err error, description string)
	SetSyncResult(span trace.Span, eventsPushed, eventsPulled, conflictsResolved int)
}) *SyncManagerBuilder

WithTracer sets a tracer for distributed tracing.

func (*SyncManagerBuilder) WithTransport

func (b *SyncManagerBuilder) WithTransport(transport Transport) *SyncManagerBuilder

WithTransport sets the Transport for the SyncManager.

func (*SyncManagerBuilder) WithValidation

func (b *SyncManagerBuilder) WithValidation() *SyncManagerBuilder

WithValidation enables additional validation checks during sync operations.

type SyncNode added in v0.22.0

type SyncNode = SyncManager

SyncNode is the preferred façade for creating and managing sync participants. It is currently a type alias for SyncManager but may evolve into a dedicated struct in future releases.

func NewHTTPClientNode added in v0.22.0

func NewHTTPClientNode(store EventStore, transport Transport) (SyncNode, error)

NewHTTPClientNode creates a SyncNode configured as an HTTP client. The caller should provide an appropriate HTTP client transport configured for the server URL.

Example usage:

store := sqlite.New("client.db")
transport := httptransport.NewTransport("http://localhost:8080/sync", nil, nil, nil)
node, err := synckit.NewHTTPClientNode(store, transport)

func NewHTTPServerNode added in v0.22.0

func NewHTTPServerNode(store EventStore, transport Transport) (SyncNode, error)

NewHTTPServerNode creates a SyncNode configured with the provided store and transport. The caller should provide an appropriate HTTP server transport implementation.

Example usage:

store := sqlite.New("app.db")
transport := httptransport.NewTransport("", nil, nil, nil) // Configure for server use
node, err := synckit.NewHTTPServerNode(store, transport)

func NewInMemoryNode added in v0.22.0

func NewInMemoryNode(store EventStore, transport Transport) (SyncNode, error)

NewInMemoryNode creates a SyncNode with the provided in-memory store and transport. This is perfect for development, testing, and examples where no persistence is needed.

Example usage:

store := memstore.New()
transport := memchan.New(16)
node, err := synckit.NewInMemoryNode(store, transport)

func NewNode added in v0.22.0

func NewNode(opts ...ManagerOption) (SyncNode, error)

NewNode mirrors NewManager to construct a SyncNode. Use this instead of NewManager in new code.

Example (InMemory)

ExampleNewNode_inMemory demonstrates creating a SyncNode with in-memory storage and performing a sync operation in a single-process environment.

package main

import (
	"context"
	"fmt"
	"io"
	"log/slog"

	"github.com/c0deZ3R0/go-sync-kit/storage/memstore"
	"github.com/c0deZ3R0/go-sync-kit/synckit"
	"github.com/c0deZ3R0/go-sync-kit/transport/memchan"
)

func main() {
	ctx := context.Background()

	// Create an in-memory event store
	store := memstore.New()

	// Create an in-memory channel transport with capacity for 16 events
	transport := memchan.New(16)

	// Create a silent logger for clean example output
	silentLogger := slog.New(slog.NewTextHandler(io.Discard, nil))

	// Create a SyncNode with the store and transport
	node, err := synckit.NewNode(
		synckit.WithStore(store),
		synckit.WithTransport(transport),
		synckit.WithManagerLogger(silentLogger),
	)
	if err != nil {
		panic(err)
	}

	// Perform an initial sync (no events yet)
	result, err := node.Sync(ctx)
	if err != nil {
		panic(err)
	}

	fmt.Printf("Events pulled: %d, Events pushed: %d\n", result.EventsPulled, result.EventsPushed)
}
Output:

Events pulled: 0, Events pushed: 0

type SyncOptions

type SyncOptions struct {
	// LastCursorLoader loads the last saved cursor for cursor-based syncs
	LastCursorLoader func() cursor.Cursor

	// CursorSaver saves the latest cursor after a successful sync
	CursorSaver func(cursor.Cursor) error

	// PushOnly indicates this client should only push events, not pull
	PushOnly bool

	// PullOnly indicates this client should only pull events, not push
	PullOnly bool

	// ConflictResolver to use for handling conflicts
	ConflictResolver ConflictResolver

	// Filter can be used to sync only specific events
	Filter func(Event) bool

	// BatchSize limits how many events to sync at once
	BatchSize int

	// SyncInterval for automatic periodic sync (0 disables)
	SyncInterval time.Duration

	// RetryConfig configures retry behavior for sync operations
	RetryConfig *RetryConfig

	// EnableValidation enables additional validation checks during sync
	EnableValidation bool

	// Timeout sets the maximum duration for sync operations
	Timeout time.Duration

	// EnableCompression enables data compression during transport
	EnableCompression bool

	// MetricsCollector for observability hooks (optional)
	MetricsCollector MetricsCollector

	// Tracer for distributed tracing (optional)
	Tracer interface {
		// StartSyncOperation starts a new span for a sync operation
		StartSyncOperation(ctx context.Context, operation string) (context.Context, trace.Span)
		// StartTransportOperation starts a new span for transport operations
		StartTransportOperation(ctx context.Context, operation, transport string) (context.Context, trace.Span)
		// StartStorageOperation starts a new span for storage operations
		StartStorageOperation(ctx context.Context, operation, storageType string) (context.Context, trace.Span)
		// StartConflictResolution starts a new span for conflict resolution
		StartConflictResolution(ctx context.Context, strategy string) (context.Context, trace.Span)
		// RecordError records an error on a span
		RecordError(span trace.Span, err error, description string)
		// SetSyncResult sets sync result attributes on a span
		SetSyncResult(span trace.Span, eventsPushed, eventsPulled, conflictsResolved int)
	}
}

SyncOptions configures the synchronization behavior

type SyncResult

type SyncResult struct {
	// EventsPushed is the number of events sent to remote
	EventsPushed int

	// EventsPulled is the number of events received from remote
	EventsPulled int

	// ConflictsResolved is the number of conflicts that were resolved
	ConflictsResolved int

	// Errors contains any non-fatal errors that occurred during sync
	Errors []error

	// StartTime is when the sync operation began
	StartTime time.Time

	// Duration is how long the sync took
	Duration time.Duration

	// LocalVersion is the local version after sync
	LocalVersion Version

	// RemoteVersion is the remote version after sync
	RemoteVersion Version
}

SyncResult provides information about a completed sync operation

type SyncState added in v0.18.0

type SyncState int

SyncState represents the state of a sync operation.

const (
	// SyncIdle indicates the sync manager is ready for new operations
	SyncIdle SyncState = iota

	// SyncInitializing indicates a sync operation is being set up
	SyncInitializing

	// SyncPushing indicates local events are being sent to remote
	SyncPushing

	// SyncPulling indicates remote events are being retrieved
	SyncPulling

	// SyncResolvingConflicts indicates conflicts are being processed
	SyncResolvingConflicts

	// SyncCompleted indicates the sync operation completed successfully
	SyncCompleted

	// SyncFailed indicates the sync operation failed
	SyncFailed

	// SyncCancelled indicates the sync operation was cancelled
	SyncCancelled
)

func (SyncState) CanAutoSync added in v0.18.0

func (s SyncState) CanAutoSync() bool

CanAutoSync returns true if auto-sync operations are allowed in this state.

func (SyncState) IsActive added in v0.18.0

func (s SyncState) IsActive() bool

IsActive returns true if this state represents an active sync operation.

func (SyncState) IsTerminal added in v0.18.0

func (s SyncState) IsTerminal() bool

IsTerminal returns true if this state represents the end of a sync operation.

func (SyncState) String added in v0.18.0

func (s SyncState) String() string

String returns the string representation of the sync state.

type SyncStateObserver added in v0.18.0

type SyncStateObserver struct {
	// OnStateChange is called when a sync state transition succeeds
	OnStateChange func(from, to SyncState, duration time.Duration, metadata map[string]interface{})

	// OnStateChangeError is called when a sync state transition fails
	OnStateChangeError func(from, to SyncState, err error, metadata map[string]interface{})
}

SyncStateObserver provides a convenient way to observe sync state changes.

func (*SyncStateObserver) OnTransition added in v0.18.0

func (o *SyncStateObserver) OnTransition(transition statemachine.StateTransition[SyncState])

OnTransition implements the StateObserver interface.

func (*SyncStateObserver) OnTransitionFailed added in v0.18.0

func (o *SyncStateObserver) OnTransitionFailed(from, to SyncState, err error, metadata map[string]interface{})

OnTransitionFailed implements the StateObserver interface.

type TestEvent

type TestEvent struct{}

TestEvent implements Event interface for testing

func (*TestEvent) AggregateID

func (m *TestEvent) AggregateID() string

func (*TestEvent) Data

func (m *TestEvent) Data() interface{}

func (*TestEvent) ID

func (m *TestEvent) ID() string

func (*TestEvent) Metadata

func (m *TestEvent) Metadata() map[string]interface{}

func (*TestEvent) Type

func (m *TestEvent) Type() string

type TestEventStore

type TestEventStore struct {
	// contains filtered or unexported fields
}

TestEventStore implements a simple in-memory event store for testing

func (*TestEventStore) Close

func (m *TestEventStore) Close() error

func (*TestEventStore) LatestVersion

func (m *TestEventStore) LatestVersion(_ context.Context) (Version, error)

func (*TestEventStore) Load

func (*TestEventStore) LoadByAggregate

func (m *TestEventStore) LoadByAggregate(_ context.Context, _ string, _ Version, _ ...Filter) ([]EventWithVersion, error)

func (*TestEventStore) ParseVersion

func (m *TestEventStore) ParseVersion(_ context.Context, _ string) (Version, error)

func (*TestEventStore) Store

func (m *TestEventStore) Store(_ context.Context, event Event, version Version) error

type TestTransport

type TestTransport struct {
	// contains filtered or unexported fields
}

TestTransport implements a simple transport for testing

func (*TestTransport) Close

func (m *TestTransport) Close() error

func (*TestTransport) GetLatestVersion

func (m *TestTransport) GetLatestVersion(ctx context.Context) (Version, error)

func (*TestTransport) Pull

func (m *TestTransport) Pull(ctx context.Context, since Version) ([]EventWithVersion, error)

func (*TestTransport) Push

func (m *TestTransport) Push(ctx context.Context, events []EventWithVersion) error

func (*TestTransport) Subscribe

func (m *TestTransport) Subscribe(_ context.Context, _ func([]EventWithVersion) error) error

type TestVersion

type TestVersion struct{}

TestVersion implements Version interface for testing

func (*TestVersion) Compare

func (m *TestVersion) Compare(_ Version) int

func (*TestVersion) IsZero

func (m *TestVersion) IsZero() bool

func (*TestVersion) String

func (m *TestVersion) String() string

type TimeoutSettings added in v0.15.0

type TimeoutSettings struct {
	ResolutionTimeoutMs   int `json:"resolution_timeout_ms,omitempty" yaml:"resolution_timeout_ms,omitempty"`
	ManualReviewTimeoutMs int `json:"manual_review_timeout_ms,omitempty" yaml:"manual_review_timeout_ms,omitempty"`
}

TimeoutSettings contains timeout configurations.

type Transport

type Transport = types.Transport

Transport handles the actual network communication between clients and servers. Implementations can use HTTP, gRPC, WebSockets, NATS, etc.

This is now an alias to the stabilized interface in synckit/types.

type Validator added in v0.15.0

type Validator interface {
	Validate(options *resolverOptions) error
}

Validator can perform configuration validation at construction time. Return nil if configuration is valid; return error to prevent construction.

type Version

type Version = types.Version

Version represents a point-in-time snapshot for sync operations. Users can implement different versioning strategies (timestamps, hashes, vector clocks).

Directories

Path Synopsis
Package codec provides event encoding and decoding support.
Package codec provides event encoding and decoding support.
Package statemachine provides state machine and snapshot support for event sourcing.
Package statemachine provides state machine and snapshot support for event sourcing.
Package types defines core interfaces and types for the sync kit.
Package types defines core interfaces and types for the sync kit.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL