sqlite

package
v0.24.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 7, 2025 License: MIT Imports: 17 Imported by: 0

README

SQLite EventStore

A production-ready SQLite implementation of the go-sync-kit EventStore interface.

Features

  • Full EventStore Interface: Complete implementation of all required methods
  • Transaction Safety: All write operations use database transactions
  • Connection Pooling: Configurable connection pool with sensible defaults
  • Context Support: All operations respect context cancellation and timeouts
  • Thread Safety: Safe for concurrent use across multiple goroutines
  • Custom Error Types: Specific error types for better error handling
  • Performance Optimized: Proper indexing and efficient queries
  • Comprehensive Tests: Full test coverage including benchmarks

Installation

go get github.com/mattn/go-sqlite3

Usage

Basic Usage
package main

import (
    "context"
    "log"
    
    "github.com/c0deZ3R0/go-sync-kit/storage/sqlite"
    "github.com/c0deZ3R0/go-sync-kit/synckit"
)

func main() {
    // Create a new SQLite store (simple approach)
    store, err := sqlite.NewWithDataSource("events.db")
    if err != nil {
        log.Fatal(err)
    }
    defer store.Close()
    
    // Use with SyncManager
    transport := &MyTransport{} // Your transport implementation
    options := &synckit.SyncOptions{
        BatchSize: 100,
    }
    
    syncManager := synckit.NewSyncManager(store, transport, options)
    
    // Perform sync operations
    ctx := context.Background()
    result, err := syncManager.Sync(ctx)
    if err != nil {
        log.Printf("Sync failed: %v", err)
        return
    }
    
    log.Printf("Synced: %d events pushed, %d events pulled", 
        result.EventsPushed, result.EventsPulled)
}
Advanced Configuration
// Full configuration with logging
logger := log.New(os.Stdout, "[SQLite EventStore] ", log.LstdFlags)

config := &sqlite.Config{
    DataSourceName:  "events.db",
    Logger:          logger,            // Optional: for debugging and monitoring
    TableName:       "my_events",       // Optional: custom table name
    EnableWAL:       true,              // Enable WAL mode for better concurrency
    MaxOpenConns:    25,                // Maximum number of open connections
    MaxIdleConns:    5,                 // Maximum number of idle connections
    ConnMaxLifetime: time.Hour,         // Maximum connection lifetime
    ConnMaxIdleTime: 5 * time.Minute,   // Maximum connection idle time
}

store, err := sqlite.New(config)
if err != nil {
    log.Fatal(err)
}
defer store.Close()
Working with Versions

The SQLite implementation uses IntegerVersion for simple sequential versioning:

// Create a version
version := sqlite.IntegerVersion(42)

// Compare versions
if version.Compare(sqlite.IntegerVersion(41)) > 0 {
    fmt.Println("Version 42 is greater than 41")
}

// Check if version is zero
if version.IsZero() {
    fmt.Println("This is the initial version")
}

// Convert to string
fmt.Printf("Version: %s\n", version.String())
Monitoring and Observability
// Get database statistics
stats := store.Stats()
fmt.Printf("Open connections: %d\n", stats.OpenConnections)
fmt.Printf("In use: %d\n", stats.InUse)
fmt.Printf("Idle: %d\n", stats.Idle)

Database Schema

The SQLite store creates the following table structure:

CREATE TABLE events (
    version         INTEGER PRIMARY KEY AUTOINCREMENT,
    id              TEXT NOT NULL UNIQUE,
    aggregate_id    TEXT NOT NULL,
    event_type      TEXT NOT NULL,
    data            TEXT,
    metadata        TEXT,
    created_at      TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Indexes for performance
CREATE INDEX idx_aggregate_id ON events (aggregate_id);
CREATE INDEX idx_version ON events (version);
CREATE INDEX idx_created_at ON events (created_at);

Error Handling

The SQLite store defines custom error types for better error handling:

import "errors"

// Check for specific errors
_, err := store.Load(ctx, version)
if errors.Is(err, sqlite.ErrStoreClosed) {
    log.Println("Store has been closed")
} else if errors.Is(err, sqlite.ErrIncompatibleVersion) {
    log.Println("Version type is not compatible")
}

Available error types:

  • ErrIncompatibleVersion: Version type is not IntegerVersion
  • ErrEventNotFound: Requested event was not found
  • ErrStoreClosed: Store has been closed

Performance Considerations

Connection Pool Settings

For high-throughput applications, tune the connection pool:

config := &sqlite.Config{
    EnableWAL:       true,              // Essential for concurrent operations
    MaxOpenConns:    50,                // Higher for more concurrent operations
    MaxIdleConns:    10,                // More idle connections for faster reuse
    ConnMaxLifetime: 30 * time.Minute, // Shorter lifetime for busy systems
    ConnMaxIdleTime: 2 * time.Minute,  // Shorter idle time to free resources
}
Batch Operations

When storing many events, consider batching them:

ctx := context.Background()
for _, event := range events {
    if err := store.Store(ctx, event, version); err != nil {
        log.Printf("Failed to store event %s: %v", event.ID(), err)
    }
}
WAL Mode

For better concurrent performance, enable WAL mode via configuration:

// Recommended: Use the EnableWAL config option
config := &sqlite.Config{
    DataSourceName: "events.db",
    EnableWAL:      true,  // Automatically adds ?_journal_mode=WAL
}
store, err := sqlite.New(config)

// Alternative: Manually specify in connection string
config := &sqlite.Config{
    DataSourceName: "events.db?_journal_mode=WAL",
}
store, err := sqlite.New(config)

Testing

Run the test suite:

# Run all tests
go test ./storage/sqlite

# Run tests with race detection
go test -race ./storage/sqlite

# Run benchmarks
go test -bench=. ./storage/sqlite

# Run tests with coverage
go test -cover ./storage/sqlite

Thread Safety

The SQLite EventStore is completely thread-safe and can be safely used across multiple goroutines. All database operations are protected by appropriate synchronization mechanisms.

Best Practices

  1. Always use defer store.Close() to ensure proper cleanup
  2. Configure connection pools based on your application's concurrency needs
  3. Use context.WithTimeout() for database operations in production
  4. Monitor database statistics in production environments
  5. Handle errors appropriately using the provided error types
  6. Consider WAL mode for applications with high read concurrency

Limitations

  • Uses IntegerVersion only - not compatible with vector clocks or other version types
  • SQLite limitations apply (single writer, file-based storage)
  • Large datasets may require additional optimization (partitioning, archiving)

Logging and Observability

The improved SQLite EventStore now supports comprehensive logging:

// Enable logging to stdout
logger := log.New(os.Stdout, "[EventStore] ", log.LstdFlags)

config := &sqlite.Config{
    DataSourceName: "events.db",
    Logger:         logger,
}

store, err := sqlite.New(config)
// Logs will show:
// [EventStore] Opening database: events.db
// [EventStore] Connection pool configured: MaxOpen=25, MaxIdle=5
// [EventStore] Successfully initialized with table: events

Migration from Mock Implementation

Replace your mock EventStore with the SQLite implementation:

// Before
store := &MockEventStore{}

// After (simple)
store, err := sqlite.NewWithDataSource("events.db")
if err != nil {
    log.Fatal(err)
}
defer store.Close()

// After (with configuration)
config := sqlite.DefaultConfig("events.db")
config.Logger = log.New(os.Stdout, "[DB] ", log.LstdFlags)
store, err := sqlite.New(config)
if err != nil {
    log.Fatal(err)
}
defer store.Close()

All interface methods remain the same, so no other code changes are required.

Documentation

Overview

Package sqlite provides a SQLite implementation of the go-sync-kit EventStore.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrIncompatibleVersion = errors.New("incompatible version type: expected cursor.IntegerCursor")
	ErrEventNotFound       = errors.New("event not found")
	ErrStoreClosed         = errors.New("store is closed")
)

Custom errors for better error handling

Functions

func ParseVersion added in v0.3.0

func ParseVersion(s string) (cursor.IntegerCursor, error)

ParseVersion parses a version string into a cursor.IntegerCursor. This is useful for HTTP transport and other external integrations.

Types

type Config

type Config struct {
	// DataSourceName is the connection string for the SQLite database.
	// For production use, consider enabling WAL mode for better concurrency.
	// Example: "file:events.db?_journal_mode=WAL"
	DataSourceName string

	// EnableWAL enables Write-Ahead Logging mode for better concurrency.
	// This is recommended for production use and is enabled by default.
	// When true, automatically appends "?_journal_mode=WAL" to DataSourceName.
	EnableWAL bool

	// Logger is an optional logger for logging internal operations and errors.
	// If nil, logging is disabled by default (logs to io.Discard).
	Logger *log.Logger

	// TableName is the name of the table to store events.
	// Defaults to "events" if empty.
	TableName string

	// Connection pool settings for production workloads.
	// Defaults: MaxOpen=25, MaxIdle=5, Lifetime=1h, IdleTime=5m
	MaxOpenConns    int           // Default: 25 - Maximum number of open connections
	MaxIdleConns    int           // Default: 5  - Maximum number of idle connections
	ConnMaxLifetime time.Duration // Default: 1h - Maximum lifetime of connections
	ConnMaxIdleTime time.Duration // Default: 5m - Maximum idle time before closing
}

Config holds configuration options for the SQLiteEventStore.

Production-ready defaults are applied by DefaultConfig() including:

  • WAL mode enabled for better concurrency
  • Connection pool with 25 max open, 5 max idle connections
  • Connection lifetimes of 1 hour max, 5 minutes max idle

func DefaultConfig

func DefaultConfig(dataSourceName string) *Config

DefaultConfig returns a Config with production-ready defaults for SQLite.

Default settings include:

  • WAL mode enabled for better concurrency
  • Connection pool: 25 max open, 5 max idle connections
  • Connection lifetime: 1 hour max, 5 minutes max idle
  • Table name: "events"
  • Logging disabled (to io.Discard)

type EventEnvelope added in v0.7.0

type EventEnvelope struct {
	ID            string         `json:"id"`
	Type          string         `json:"type"`
	AggregateID   string         `json:"aggregate_id"`
	Data          map[string]any `json:"data,omitempty"`
	Metadata      map[string]any `json:"metadata,omitempty"`
	OriginNode    string         `json:"origin_node,omitempty"`
	OriginCounter uint64         `json:"origin_counter,omitempty"`
	Seq           uint64         `json:"seq,omitempty"`
}

func PullWithCursor added in v0.7.0

func PullWithCursor(ctx context.Context, db *sql.DB, since cursor.Cursor, limit int) ([]EventEnvelope, cursor.Cursor, error)

type EventRow added in v0.7.0

type EventRow struct {
	Seq           uint64
	ID            string
	Type          string
	AggregateID   string
	DataJSON      string
	MetadataJSON  string
	OriginNode    sql.NullString
	OriginCounter sql.NullInt64
}

type SQLiteEventStore

type SQLiteEventStore struct {
	// contains filtered or unexported fields
}

SQLiteEventStore implements the sync.EventStore interface for SQLite.

func New

func New(config *Config) (*SQLiteEventStore, error)

New creates a new SQLiteEventStore from a Config. If config is nil, DefaultConfig will be used with an empty DataSourceName.

func NewWithDataSource

func NewWithDataSource(dataSourceName string) (*SQLiteEventStore, error)

NewWithDataSource is a convenience constructor

func (*SQLiteEventStore) Close

func (s *SQLiteEventStore) Close() error

Close closes the database connection.

func (*SQLiteEventStore) LatestVersion

func (s *SQLiteEventStore) LatestVersion(ctx context.Context) (synckit.Version, error)

LatestVersion returns the highest version number in the store.

func (*SQLiteEventStore) Load

Load retrieves all events since a given version with optional filters.

func (*SQLiteEventStore) LoadByAggregate

func (s *SQLiteEventStore) LoadByAggregate(ctx context.Context, aggregateID string, since synckit.Version, filters ...synckit.Filter) ([]synckit.EventWithVersion, error)

LoadByAggregate retrieves events for a specific aggregate since a given version with optional filters.

func (*SQLiteEventStore) ParseVersion added in v0.3.0

func (s *SQLiteEventStore) ParseVersion(ctx context.Context, versionStr string) (synckit.Version, error)

ParseVersion converts a string representation into a cursor.IntegerCursor. This allows external integrations to handle SQLite's integer versioning gracefully.

func (*SQLiteEventStore) Stats

func (s *SQLiteEventStore) Stats() sql.DBStats

Stats returns database statistics for monitoring

func (*SQLiteEventStore) Store

func (s *SQLiteEventStore) Store(ctx context.Context, event synckit.Event, version synckit.Version) error

Store saves an event to the SQLite database. Note: This implementation ignores the 'version' parameter and relies on SQLite's AUTOINCREMENT to assign a new, sequential version.

func (*SQLiteEventStore) StoreBatch added in v0.5.0

func (s *SQLiteEventStore) StoreBatch(ctx context.Context, events []synckit.EventWithVersion) error

StoreBatch stores multiple events in a single transaction for better performance.

type StoredEvent

type StoredEvent struct {
	// contains filtered or unexported fields
}

StoredEvent is a concrete implementation of synckit.Event used for retrieving events from the database. It holds data and metadata as raw JSON.

func (*StoredEvent) AggregateID

func (e *StoredEvent) AggregateID() string

func (*StoredEvent) Data

func (e *StoredEvent) Data() interface{}

func (*StoredEvent) ID

func (e *StoredEvent) ID() string

func (*StoredEvent) Metadata

func (e *StoredEvent) Metadata() map[string]interface{}

func (*StoredEvent) Type

func (e *StoredEvent) Type() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL