store

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 16, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

README

eventsalsa/store

CI Go Report Card GoDoc

A minimal, production-ready event store for Go.

Features

  • PostgreSQL-backed event store — append-only, immutable event log with BIGSERIAL global positions
  • Optimistic concurrency control — via expected versions enforced at the application and database level
  • Aggregate stream reads — load a full or partial event history with optional version ranges
  • Sequential event reading — read events by global position for building consumers and projections
  • Transaction-first design — all operations accept *sql.Tx; you control transaction boundaries
  • Consumer interfacesConsumer and ScopedConsumer for event processing
  • SQL migration generatorcmd/migrate-gen generates a ready-to-apply .sql file
  • Event mapping code generatorcmd/eventmap-gen generates type-safe domain event mappings

Quick Start

1. Install
go get github.com/eventsalsa/store

Choose your PostgreSQL driver:

go get github.com/lib/pq
2. Generate Migrations
go run github.com/eventsalsa/store/cmd/migrate-gen -output migrations

Apply the generated file with your preferred migration tool:

psql -h localhost -U postgres -d mydb -f migrations/*_init_event_sourcing.sql
3. Append and Read Events
package main

import (
    "context"
    "database/sql"
    "encoding/json"
    "log"
    "time"

    "github.com/google/uuid"
    _ "github.com/lib/pq"

    "github.com/eventsalsa/store"
    "github.com/eventsalsa/store/postgres"
)

func main() {
    db, err := sql.Open("postgres", "host=localhost user=postgres dbname=mydb sslmode=disable")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    ctx := context.Background()
    s := postgres.NewStore(postgres.DefaultStoreConfig())

    // Append events to a new aggregate
    userID := uuid.New().String()
    payload, _ := json.Marshal(map[string]string{"email": "alice@example.com", "name": "Alice"})

    tx, err := db.BeginTx(ctx, nil)
    if err != nil {
        log.Fatal(err)
    }
    defer tx.Rollback() //nolint:errcheck

    result, err := s.Append(ctx, tx, store.NoStream(), []store.Event{
        {
            AggregateType: "User",
            AggregateID:   userID,
            EventID:       uuid.New(),
            EventType:     "UserCreated",
            EventVersion:  1,
            Payload:       payload,
            Metadata:      []byte(`{}`),
            CreatedAt:     time.Now(),
        },
    })
    if err != nil {
        log.Fatal(err)
    }

    if err := tx.Commit(); err != nil {
        log.Fatal(err)
    }

    log.Printf("appended at positions %v, aggregate now at version %d",
        result.GlobalPositions, result.ToVersion())

    // Read the aggregate stream
    tx2, _ := db.BeginTx(ctx, nil)
    defer tx2.Rollback() //nolint:errcheck

    stream, err := s.ReadAggregateStream(ctx, tx2, "User", userID, nil, nil)
    if err != nil {
        log.Fatal(err)
    }
    tx2.Commit() //nolint:errcheck

    log.Printf("stream: %d events, current version %d", stream.Len(), stream.Version())
    for _, e := range stream.Events {
        log.Printf("  v%d  %s  pos=%d", e.AggregateVersion, e.EventType, e.GlobalPosition)
    }
}

Core Concepts

Events & Aggregates

store.Event is an immutable value object that you construct before persisting. The store assigns AggregateVersion and GlobalPosition during Append.

event := store.Event{
    AggregateType: "Order",         // logical category of the aggregate
    AggregateID:   orderID,         // string identifier — UUID, email, slug, etc.
    EventID:       uuid.New(),      // idempotency key for the event itself
    EventType:     "OrderPlaced",   // discriminator used by consumers
    EventVersion:  1,               // schema version of the payload
    Payload:       payload,         // serialized domain data (JSON, proto, etc.)
    Metadata:      metadata,        // cross-cutting concerns (request ID, actor, etc.)
    CreatedAt:     time.Now(),
    // optional tracing fields:
    TraceID:       store.NullString{String: traceID, Valid: true},
    CorrelationID: store.NullString{String: corrID, Valid: true},
    CausationID:   store.NullString{String: causID, Valid: true},
}

store.PersistedEvent is what you read back. It adds GlobalPosition and AggregateVersion.

store.Stream wraps the full ordered history of a single aggregate along with helper methods:

stream.Version()  // current aggregate version (0 if empty)
stream.IsEmpty()  // true if no events were found
stream.Len()      // number of events in the stream

store.AppendResult describes the outcome of a write:

result.ToVersion()       // aggregate version after the append
result.FromVersion()     // aggregate version before the append
result.GlobalPositions   // global positions assigned to each event
result.Events            // persisted events with all fields populated
Expected Versions

Expected versions are the mechanism for optimistic concurrency. You declare the state you expect the aggregate to be in before writing.

Constructor When to use
store.NoStream() Creating a new aggregate — fails if it already exists
store.Exact(n) Updating an existing aggregate at a known version — fails on conflict
store.Any() Unconditional write — skips version validation entirely

Conflicts return store.ErrOptimisticConcurrency. The database unique constraint on (aggregate_type, aggregate_id, aggregate_version) acts as a final safety net even if two transactions pass the application-level check simultaneously.

// Create — must not already exist
result, err := s.Append(ctx, tx, store.NoStream(), events)

// Update at a known version
result, err := s.Append(ctx, tx, store.Exact(stream.Version()), events)

// Unconditional
result, err := s.Append(ctx, tx, store.Any(), events)

if errors.Is(err, store.ErrOptimisticConcurrency) {
    // reload, reapply command, retry
}
Aggregate Streams

ReadAggregateStream returns the ordered event history for a single aggregate instance. Both version bounds are optional and inclusive.

// Full history
stream, err := s.ReadAggregateStream(ctx, tx, "User", userID, nil, nil)

// From a specific version onwards (e.g., to skip already-processed events)
from := int64(5)
stream, err = s.ReadAggregateStream(ctx, tx, "User", userID, &from, nil)

// A version window
from, to := int64(5), int64(10)
stream, err = s.ReadAggregateStream(ctx, tx, "User", userID, &from, &to)
Sequential Reads

ReadEvents reads from the raw global log in position order, which is the basis for building consumers and projections.

Because global_position is sequence-backed, these positions are unique and sortable but not a safe naive checkpoint frontier under concurrent writers. Async consumers that persist checkpoints should use a gap-aware worker/runtime rather than blindly advancing to the highest seen position.

// Read up to 500 events after global position 0
events, err := s.ReadEvents(ctx, tx, 0, 500)

// Continue from last processed position
events, err = s.ReadEvents(ctx, tx, lastPosition, 500)

GetLatestGlobalPosition returns the highest position currently visible in the log — useful for lightweight wakeup or polling checks without fetching full batches. It is not a safe contiguous high-water mark for checkpoint advancement under concurrent writers.

latest, err := s.GetLatestGlobalPosition(ctx, tx)

Checkpoint safety under concurrent writers: global_position is backed by a PostgreSQL BIGSERIAL sequence, which guarantees uniqueness but not commit order. Under concurrent writers, a lower position may become visible after a higher one has already been returned. Advancing a checkpoint to the highest seen position without accounting for in-flight gaps can permanently skip events. Async consumers must use a gap-aware worker or runtime — do not treat the highest position returned by ReadEvents or GetLatestGlobalPosition as a safe naive checkpoint frontier under concurrent writers.

Scoped async filtering is intentionally a worker/runtime concern rather than a store read primitive. If a consumer needs to react to only some aggregate types, establish a safe frontier from the unscoped global stream first, then filter inside the runtime.

Consumers

The consumer package defines the interfaces for event processing.

consumer.Consumer is the base interface:

type AuditLogConsumer struct{}

func (c *AuditLogConsumer) Name() string { return "audit_log.v1" }

func (c *AuditLogConsumer) Handle(ctx context.Context, tx *sql.Tx, event store.PersistedEvent) error {
    // tx is the processor's transaction — use it for atomic read model + checkpoint updates.
    // Never call tx.Commit() or tx.Rollback() here; the processor owns that.
    _, err := tx.ExecContext(ctx,
        "INSERT INTO audit_log (event_id, event_type, occurred_at) VALUES ($1, $2, $3)",
        event.EventID, event.EventType, event.CreatedAt,
    )
    return err
}

consumer.ScopedConsumer narrows delivery to specific aggregate types. Consumers that implement only Consumer receive all events.

type UserReadModel struct{}

func (p *UserReadModel) Name() string              { return "user_read_model.v1" }
func (p *UserReadModel) AggregateTypes() []string  { return []string{"User"} }

func (p *UserReadModel) Handle(ctx context.Context, tx *sql.Tx, event store.PersistedEvent) error {
    // Only receives events where AggregateType == "User"
    return nil
}

PostgreSQL Implementation

Configuration

postgres.NewStore accepts a *postgres.StoreConfig built with functional options:

s := postgres.NewStore(postgres.NewStoreConfig(
    postgres.WithEventsTable("my_events"),           // default: "events"
    postgres.WithAggregateHeadsTable("agg_heads"),   // default: "aggregate_heads"
    postgres.WithLogger(myLogger),                   // optional; nil disables logging
))

postgres.DefaultStoreConfig() returns a ready-to-use configuration with default table names and no logger.

NOTIFY Support

Configure the store to issue a pg_notify call inside each Append transaction. The notification fires only when the transaction commits — no phantom wakes.

s := postgres.NewStore(postgres.NewStoreConfig(
    postgres.WithNotifyChannel("eventsalsa_events"),
))

Consumers can LISTEN on the same channel to wake up immediately instead of polling on a fixed interval.

Migration Generator

cmd/migrate-gen generates a single .sql file that creates all required tables and indexes.

CLI:

go run github.com/eventsalsa/store/cmd/migrate-gen -output migrations
# writes migrations/20060102150405_init_event_sourcing.sql

go run github.com/eventsalsa/store/cmd/migrate-gen \
  -output migrations \
  -filename 001_events.sql \
  -events-table my_events \
  -aggregate-heads-table my_aggregate_heads

go:generate:

//go:generate go run github.com/eventsalsa/store/cmd/migrate-gen -output migrations -filename init.sql

The generated migration creates:

  • events — append-only event log with global_position BIGSERIAL primary key, event_id UUID UNIQUE, and a UNIQUE (aggregate_type, aggregate_id, aggregate_version) constraint that enforces optimistic concurrency at the database level
  • aggregate_heads — one row per aggregate tracking its current version for O(1) version lookups during Append

Event Mapping Code Generator

cmd/eventmap-gen generates type-safe mapping code between your domain event structs and store.Event / store.PersistedEvent. This keeps your domain model free of infrastructure dependencies.

go run github.com/eventsalsa/store/cmd/eventmap-gen \
  -input internal/domain/events \
  -output internal/infrastructure/generated

See the eventmap-codegen example for a complete demonstration including versioned events and schema evolution patterns.

Examples

Complete, runnable examples are in examples/:

  • basic — connecting, appending events, reading aggregate streams, and reading the global log

  • eventmap-codegen — generating type-safe domain event mappings with eventmap-gen, including versioned payloads and projections

Development

Unit tests:

make test-unit

Integration tests (requires Docker):

make test-integration-local

This starts a PostgreSQL container via docker compose, runs all integration tests, then cleans up.

Manual integration testing:

docker compose up -d
make test-integration
docker compose down

Lint and format:

make lint
make fmt

License

This project is licensed under the MIT License — see the LICENSE file for details.

Documentation

Overview

Package store provides core event sourcing types and persistence interfaces.

This package defines the fundamental building blocks for event sourcing:

  • Event types: Event (before persistence) and PersistedEvent (after persistence)
  • Stream: Full history for a single aggregate
  • Store interfaces: EventStore, EventReader, AggregateStreamReader
  • Optimistic concurrency: ExpectedVersion with Any, NoStream, and Exact modes
  • Observability: Logger interface for optional structured logging

The postgres package provides the PostgreSQL implementation of these interfaces.

Example usage:

store := postgres.NewStore(postgres.DefaultStoreConfig())
tx, _ := db.BeginTx(ctx, nil)
result, err := store.Append(ctx, tx, store.NoStream(), events)
tx.Commit()

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrOptimisticConcurrency indicates a version conflict during append.
	ErrOptimisticConcurrency = errors.New("optimistic concurrency conflict")

	// ErrNoEvents indicates an attempt to append zero events.
	ErrNoEvents = errors.New("no events to append")
)

Functions

This section is empty.

Types

type AggregateStreamReader

type AggregateStreamReader interface {
	// ReadAggregateStream reads all events for a specific aggregate instance and returns
	// them as a Stream containing the aggregate's full history.
	// Events are ordered by aggregate_version ascending.
	//
	// Parameters:
	// - aggregateType: the type of aggregate (e.g., "User", "Order")
	// - aggregateID: the unique identifier of the aggregate instance (can be UUID string, email, etc.)
	// - fromVersion: optional minimum version (inclusive). Pass nil to read from the beginning.
	// - toVersion: optional maximum version (inclusive). Pass nil to read to the end.
	//
	// Examples:
	// - ReadAggregateStream(ctx, tx, "User", "550e8400-e29b-41d4-a716-446655440000", nil, nil) - read all events
	// - ReadAggregateStream(ctx, tx, "User", id, ptr(5), nil) - read from version 5 onwards
	// - ReadAggregateStream(ctx, tx, "User", id, nil, ptr(10)) - read up to version 10
	// - ReadAggregateStream(ctx, tx, "User", id, ptr(5), ptr(10)) - read versions 5-10
	//
	// Returns a Stream with an empty Events slice if no events match the criteria.
	// Use stream.Version() to get the current aggregate version.
	// Use stream.IsEmpty() to check if any events were found.
	ReadAggregateStream(ctx context.Context, tx *sql.Tx, aggregateType string, aggregateID string, fromVersion, toVersion *int64) (Stream, error)
}

AggregateStreamReader defines the interface for reading events for a specific aggregate.

type AppendResult

type AppendResult struct {
	Events          []PersistedEvent
	GlobalPositions []int64
}

AppendResult represents the outcome of an Append operation. It contains only the events that were just committed, not the full history. AppendResult must never imply full history - use Stream for that purpose.

func (AppendResult) FromVersion

func (r AppendResult) FromVersion() int64

FromVersion returns the aggregate version before the append. If no events were appended, returns 0. Otherwise, returns the version immediately before the first appended event.

func (AppendResult) ToVersion

func (r AppendResult) ToVersion() int64

ToVersion returns the aggregate version after the append. If no events were appended, returns 0. Otherwise, returns the AggregateVersion of the last appended event.

type Event

type Event struct {
	CreatedAt     time.Time
	AggregateType string
	EventType     string
	AggregateID   string
	Payload       []byte
	Metadata      []byte
	CausationID   NullString
	CorrelationID NullString
	TraceID       NullString
	EventVersion  int
	EventID       uuid.UUID
}

Event represents an immutable domain event before persistence. Events are value objects without identity until persisted. AggregateVersion and GlobalPosition are assigned by the store during Append.

type EventReader

type EventReader interface {
	// ReadEvents reads events starting from the given global position.
	// Returns up to limit events ordered by global_position ascending.
	//
	// WARNING: global_position is BIGSERIAL-backed. PostgreSQL sequences guarantee
	// uniqueness, not commit order. A lower position allocated by a concurrent
	// transaction may become visible after a higher one has already been returned.
	// Advancing a checkpoint to the highest seen position without accounting for
	// in-flight gaps can permanently skip events. Async consumers must use a
	// gap-aware runtime; do not treat the highest returned position as a safe
	// checkpoint frontier under concurrent writers.
	ReadEvents(ctx context.Context, tx *sql.Tx, fromPosition int64, limit int) ([]PersistedEvent, error)
}

EventReader defines the interface for reading events sequentially.

type EventStore

type EventStore interface {
	// Append atomically appends one or more events within the provided transaction.
	// Events must all belong to the same aggregate instance.
	// Returns an AppendResult containing the persisted events with assigned versions
	// and their global positions, or an error.
	//
	// The expectedVersion parameter controls optimistic concurrency:
	// - Any(): No version check - always succeeds if no other errors
	// - NoStream(): Aggregate must not exist - used for aggregate creation
	// - Exact(N): Aggregate must be at version N - used for normal updates
	//
	// The store automatically assigns AggregateVersion to each event:
	// - Fetches the current version from the aggregate_heads table (O(1) lookup)
	// - Validates against expectedVersion
	// - Assigns consecutive versions starting from (current + 1)
	// - Updates aggregate_heads with the new version
	// - The database unique constraint on (aggregate_type, aggregate_id, aggregate_version)
	//   enforces optimistic concurrency as a last safety net
	//
	// Returns ErrOptimisticConcurrency if expectedVersion validation fails or if
	// another transaction commits conflicting events between the version check and insert
	// (detected via unique constraint violation).
	// Returns ErrNoEvents if events slice is empty.
	//
	// After a successful append:
	// - Use result.ToVersion() to get the new aggregate version
	// - Use result.Events to access the persisted events with all fields populated
	// - Use result.GlobalPositions to get the assigned global positions
	Append(ctx context.Context, tx *sql.Tx, expectedVersion ExpectedVersion, events []Event) (AppendResult, error)
}

EventStore defines the interface for appending events.

type ExpectedVersion

type ExpectedVersion struct {
	// contains filtered or unexported fields
}

ExpectedVersion represents the expected aggregate version for optimistic concurrency control. It is used in the Append operation to declare expectations about the current state of an aggregate.

func Any

func Any() ExpectedVersion

Any returns an ExpectedVersion that skips version validation. Use this when you don't need optimistic concurrency control.

func Exact

func Exact(version int64) ExpectedVersion

Exact returns an ExpectedVersion that enforces the aggregate must be at exactly the specified version. Use this for normal command handling with optimistic concurrency control. The version must be non-negative (>= 0). Note that Exact(0) is equivalent to NoStream().

func NoStream

func NoStream() ExpectedVersion

NoStream returns an ExpectedVersion that enforces the aggregate must not exist. Use this when creating a new aggregate to ensure it doesn't already exist. This is useful for enforcing uniqueness constraints via reservation aggregates.

func (ExpectedVersion) IsAny

func (ev ExpectedVersion) IsAny() bool

IsAny returns true if this is an "Any" expected version (no version check).

func (ExpectedVersion) IsExact

func (ev ExpectedVersion) IsExact() bool

IsExact returns true if this is an "Exact" expected version (aggregate must be at specific version).

func (ExpectedVersion) IsNoStream

func (ev ExpectedVersion) IsNoStream() bool

IsNoStream returns true if this is a "NoStream" expected version (aggregate must not exist).

func (ExpectedVersion) String

func (ev ExpectedVersion) String() string

String returns a string representation of the ExpectedVersion.

func (ExpectedVersion) Value

func (ev ExpectedVersion) Value() int64

Value returns the exact version number if this is an Exact expected version. Returns 0 for Any and NoStream.

type GlobalPositionReader

type GlobalPositionReader interface {
	// GetLatestGlobalPosition returns the highest global_position currently present in the event log.
	// Returns 0 when no events exist.
	//
	// WARNING: Because global_position is BIGSERIAL-backed, the returned value is not a safe
	// checkpoint frontier under concurrent writers. A concurrent transaction holding a lower
	// position may commit after this call returns, making that position invisible to any
	// consumer that has already advanced its checkpoint past it.
	GetLatestGlobalPosition(ctx context.Context, tx *sql.Tx) (int64, error)
}

GlobalPositionReader defines the interface for reading the latest global event position. This is useful for lightweight "new events available" checks without loading full batches.

type Logger

type Logger interface {
	// Debug logs debug-level information for detailed troubleshooting.
	// Typically used for verbose operational details.
	Debug(ctx context.Context, msg string, keyvals ...interface{})

	// Info logs informational messages about normal operations.
	// Used to track significant events during normal execution.
	Info(ctx context.Context, msg string, keyvals ...interface{})

	// Error logs error-level information about failures.
	// Used to track errors that require attention.
	Error(ctx context.Context, msg string, keyvals ...interface{})
}

Logger provides a minimal interface for observability and debugging. It is designed to be optional and non-blocking, with zero overhead when disabled. Users can implement this interface to integrate their preferred logging library.

type NoOpLogger

type NoOpLogger struct{}

NoOpLogger is a logger that does nothing. It can be used as a default when no logging is desired.

func (NoOpLogger) Debug

func (NoOpLogger) Debug(_ context.Context, _ string, _ ...interface{})

Debug implements Logger.

func (NoOpLogger) Error

func (NoOpLogger) Error(_ context.Context, _ string, _ ...interface{})

Error implements Logger.

func (NoOpLogger) Info

func (NoOpLogger) Info(_ context.Context, _ string, _ ...interface{})

Info implements Logger.

type NullString

type NullString struct {
	String string
	Valid  bool // Valid is true if String is not NULL
}

NullString represents a string that may be null. It implements database/sql Scanner and Valuer interfaces for SQL interop, but avoids direct dependency on sql.NullString in public types.

func (*NullString) Scan

func (ns *NullString) Scan(value interface{}) error

Scan implements the sql.Scanner interface.

func (NullString) Value

func (ns NullString) Value() (driver.Value, error)

Value implements the driver.Valuer interface.

type PersistedEvent

type PersistedEvent struct {
	CreatedAt        time.Time
	AggregateType    string
	EventType        string
	AggregateID      string
	CausationID      NullString
	Metadata         []byte
	Payload          []byte
	CorrelationID    NullString
	TraceID          NullString
	GlobalPosition   int64
	AggregateVersion int64
	EventVersion     int
	EventID          uuid.UUID
}

PersistedEvent represents an event that has been stored. It includes the GlobalPosition and AggregateVersion assigned by the event store.

type Stream

type Stream struct {
	AggregateType string
	AggregateID   string
	Events        []PersistedEvent
}

Stream represents the full historical event stream for a single aggregate. It is immutable after creation and is returned from read operations. Stream must never be returned from Append operations.

func (Stream) IsEmpty

func (s Stream) IsEmpty() bool

IsEmpty returns true if the stream contains no events.

func (Stream) Len

func (s Stream) Len() int

Len returns the number of events in the stream.

func (Stream) Version

func (s Stream) Version() int64

Version returns the current version of the aggregate. If the stream is empty (no events), version is 0. Otherwise, version is the AggregateVersion of the last event in the stream.

Directories

Path Synopsis
cmd
eventmap-gen command
Command eventmap-gen generates mapping code between domain events and event sourcing types.
Command eventmap-gen generates mapping code between domain events and event sourcing types.
migrate-gen command
Command migrate-gen generates SQL migration files for event sourcing.
Command migrate-gen generates SQL migration files for event sourcing.
Package consumer provides event consumer interface definitions.
Package consumer provides event consumer interface definitions.
Package eventmap provides code generation for mapping between domain events and eventsalsa event sourcing types (store.Event and store.PersistedEvent).
Package eventmap provides code generation for mapping between domain events and eventsalsa event sourcing types (store.Event and store.PersistedEvent).
examples
basic command
Package main demonstrates basic usage of the eventsalsa event store.
Package main demonstrates basic usage of the eventsalsa event store.
Package migrations provides SQL migration generation.
Package migrations provides SQL migration generation.
Package postgres provides a PostgreSQL implementation for the event store.
Package postgres provides a PostgreSQL implementation for the event store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL