es

package
v0.30.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 4, 2026 License: MIT Imports: 19 Imported by: 0

README

Event Sourcing Package

A complete event sourcing framework with optimistic concurrency, snapshots, consumer checkpointing, and projection support.

Overview

The es package stores domain state as a sequence of immutable events rather than mutable records. Aggregates raise events, the repository persists them, and consumers build projections. Snapshots optimize loading aggregates with many events.

flowchart LR
    subgraph "Event Sourcing"
        Aggregate["Aggregate"] -->|raise| Events["Events"]
        Events -->|persist| Store["EventStore"]
        Store -->|subscribe| Consumer["Consumer"]
        Consumer -->|build| Projection["Projection"]
        Store -.->|optimize| Snapshot["Snapshot"]
    end

Import

import "github.com/codewandler/clstr-go/core/es"

Quick Start

// Define an aggregate
type User struct {
    es.BaseAggregate
    Name  string
    Email string
}

func (u *User) GetAggType() string { return "user" }

func (u *User) Register(r es.Registrar) {
    es.RegisterEvents(r, es.Event[NameChanged](), es.Event[EmailChanged]())
}

func (u *User) Apply(evt any) error {
    switch e := evt.(type) {
    case *es.AggregateCreatedEvent:
        return u.BaseAggregate.Apply(evt)
    case *NameChanged:
        u.Name = e.Name
    case *EmailChanged:
        u.Email = e.Email
    }
    return nil
}

func (u *User) ChangeName(name string) error {
    return es.RaiseAndApply(u, &NameChanged{Name: name})
}

// Use it
env := es.NewEnv(es.WithInMemory(), es.WithAggregates(&User{}))
repo := es.NewTypedRepositoryFrom[*User](logger, env.Repository())

user, _ := repo.Create(ctx, "user-123")
user.ChangeName("Alice")
repo.Save(ctx, user)

Core Concepts

Aggregate Interface

The central domain object interface:

type Aggregate interface {
    GetAggType() string      // Type name for stream identification
    GetID() string           // Unique identifier
    SetID(string)
    GetVersion() Version     // Per-aggregate version (1, 2, 3, ...)
    GetSeq() uint64          // Global stream sequence
    Create(id string) error  // Initialize new aggregate
    Register(r Registrar)    // Register event types
    Raise(event any)         // Record uncommitted event
    Apply(event any) error   // Update state from event
    Uncommitted() []any      // Get uncommitted events
    ClearUncommitted()       // Clear after save
}
BaseAggregate

Embeddable base implementation:

type User struct {
    es.BaseAggregate  // Embed this
    Name string
}

// BaseAggregate provides:
// - ID, Version, Seq management
// - CreatedAt timestamp
// - Uncommitted event tracking
// - Create() raises AggregateCreatedEvent
// - IsCreated() check
Version Type

Per-aggregate monotonically increasing version:

type Version uint64

v := es.Version(5)
v.Uint64()                    // 5
v.SlogAttr()                  // slog.Attr for logging
v.SlogAttrWithKey("ver")      // Custom key
Event Helpers
// Raise and apply in one call (validates, raises, applies)
es.RaiseAndApply(aggregate, &NameChanged{Name: "Alice"})

// Deferred version for method chaining
es.RaiseAndApplyD(aggregate, &NameChanged{Name: "Alice"})()

// Conditional raise
aggregate.Checked(SomeCond, func() error {
    return es.RaiseAndApply(aggregate, &SomeEvent{})
})

Event Registration

EventRegistry

Register event types for deserialization:

registry := es.NewRegistry()

// Option 1: Generic helper
registry.Register(es.Event[NameChanged]())
registry.Register(es.Event[EmailChanged]())

// Option 2: Multiple at once
es.RegisterEvents(registry,
    es.Event[NameChanged](),
    es.Event[EmailChanged](),
)

// Option 3: In aggregate's Register method
func (u *User) Register(r es.Registrar) {
    es.RegisterEvents(r, es.Event[NameChanged](), es.Event[EmailChanged]())
}
Envelope

Event storage container:

type Envelope struct {
    ID            string          // Unique event ID
    Seq           uint64          // Global sequence (total ordering)
    Version       Version         // Per-aggregate version
    AggregateType string
    AggregateID   string
    Type          string          // Event type name
    OccurredAt    time.Time
    Data          json.RawMessage // JSON-encoded event
}

EventStore Interface

Low-level event persistence:

type EventStore interface {
    Stream  // Inherits Subscribe()

    Load(ctx context.Context, aggType, aggID string, opts ...StoreLoadOption) ([]Envelope, error)
    Append(ctx context.Context, aggType, aggID string, expectedVersion Version,
           events []Envelope) (*StoreAppendResult, error)
}

Load Options:

es.WithStartAtVersion(es.Version(5))  // Events >= version 5
es.WithStartSeq(1000)                 // Events >= sequence 1000

Concurrency Guarantee:

  • Append() returns ErrConcurrencyConflict if expectedVersion doesn't match
  • Enables optimistic locking without explicit locks

Repository

Application-level aggregate operations:

type Repository interface {
    Load(ctx context.Context, agg Aggregate, opts ...LoadOption) error
    Save(ctx context.Context, agg Aggregate, opts ...SaveOption) error
    CreateSnapshot(ctx, agg, SnapshotSaveOpts) (Snapshot, error)
}

repo := es.NewRepository(logger, store, registry, opts...)

Repository Options:

es.WithSnapshotter(snapshotter)   // Enable snapshots
es.WithRepoCache(cache)           // Add custom cache
es.WithRepoCacheLRU(1000)         // LRU cache with size
es.WithIDGenerator(gen)           // Custom event ID generation
es.WithLoadOpts(...)              // Default load options
es.WithSaveOpts(...)              // Default save options
es.WithMetrics(metrics)           // Prometheus/custom metrics
TypedRepository

Generic wrapper with type safety:

type TypedRepository[T Aggregate] interface {
    GetAggType() string
    New() T
    NewWithID(id string) T

    Create(ctx, aggID string, opts ...SaveOption) (T, error)
    GetOrCreate(ctx, aggID string, opts ...LoadAndSaveOption) (T, error)
    GetByID(ctx, aggID string, opts ...LoadOption) (T, error)

    WithTransaction(ctx, aggID string, fn func(T) error, opts ...WithTransactionOption) error

    Save(ctx, agg T, opts ...SaveOption) error
}

// Create from store
repo := es.NewTypedRepository[*User](logger, store, registry, opts...)

// Create from existing repository
repo := es.NewTypedRepositoryFrom[*User](logger, env.Repository(), opts...)

Load/Save Options:

es.WithSnapshot(true)              // Use snapshots
es.WithUseCache(true)              // Use repository cache (default)
es.WithSnapshotTTL(time.Hour)      // Snapshot expiration (save only)
Transactions

Serial per-aggregate transactions:

repo.WithTransaction(ctx, "user-123", func(user *User) error {
    user.ChangeName("Bob")
    return nil  // Errors abort transaction
}, es.WithCreate(), es.WithSnapshot(true))

Properties:

  • Concurrent across different aggregates
  • Serial for same aggregate ID
  • Prevents concurrent modifications

Snapshots

Snapshot Type
type Snapshot struct {
    SnapshotID    string
    ObjID         string      // Aggregate or projection ID
    ObjType       string
    ObjVersion    Version     // State version at snapshot
    StreamSeq     uint64      // Global stream position
    CreatedAt     time.Time
    SchemaVersion int         // For migrations
    Encoding      string      // "json"
    Data          []byte      // Serialized state
}
Snapshottable Interface

Custom serialization for aggregates:

type Snapshottable interface {
    Snapshot() (data []byte, err error)
    RestoreSnapshot(data []byte) error
}

// If not implemented, JSON marshaling is used
Snapshotter Interface
type Snapshotter interface {
    SaveSnapshot(ctx, snapshot Snapshot, opts SnapshotSaveOpts) error
    LoadSnapshot(ctx, objType, objID string) (Snapshot, error)
}

// Implementations
es.NewInMemorySnapshotter()           // For testing
es.NewKeyValueSnapshotter(kvStore)    // With KV backend
Using Snapshots
// Manual snapshot on save
repo.Save(ctx, user, es.WithSnapshot(true))

// With TTL
repo.Save(ctx, user, es.WithSnapshot(true), es.WithSnapshotTTL(24*time.Hour))

// Load automatically uses snapshot if available
user, _ := repo.GetByID(ctx, "user-123", es.WithSnapshot(true))

Consumers

Consumer

Event stream processor:

consumer := es.NewConsumer(store, registry, handler,
    es.WithConsumerName("user-projector"),
    es.WithMiddlewares(es.NewLogMiddleware()),
    es.WithShutdownTimeout(10*time.Second),
    es.WithLog(logger),
)

consumer.Start(ctx)  // Blocks, processing events
consumer.Stop()      // Graceful shutdown
Handler Interface
type Handler interface {
    Handle(msgCtx MsgCtx) error
}

// Convenience function
handler := es.Handle(func(m es.MsgCtx) error {
    switch e := m.Event().(type) {
    case *NameChanged:
        // Update projection
    }
    return nil
})
MsgCtx

Event processing context:

type MsgCtx interface {
    Context() context.Context
    Log() *slog.Logger
    Event() any              // Decoded event
    Live() bool              // True after catch-up complete
    Seq() uint64             // Global sequence
    Version() Version        // Aggregate version
    AggregateID() string
    AggregateType() string
    OccurredAt() time.Time
    Envelope() Envelope      // Raw envelope
    Data() json.RawMessage   // Raw JSON
}

Live Mode:

  • Live() == false during replay/catch-up
  • Live() == true for real-time events
  • Useful for skipping side effects during replay
Middleware
// Built-in logging middleware
es.NewLogMiddleware(slog.String("component", "consumer"))

// Custom middleware
func MyMiddleware(next es.Handler) es.Handler {
    return es.Handle(func(m es.MsgCtx) error {
        // Before
        err := next.Handle(m)
        // After
        return err
    })
}

Checkpointing

Track last processed event for exactly-once semantics:

type CpStore interface {
    Get(ctx context.Context) (uint64, error)
    Set(ctx context.Context, seq uint64) error
}

// Implementations
cpStore := es.NewInMemCpStore()

// Use with middleware
consumer := es.NewConsumer(store, registry, handler,
    es.WithMiddlewares(es.NewCheckpointMiddleware(cpStore)),
)

Per-Aggregate Checkpointing:

type AggCpStore interface {
    Get(ctx, projectionName, aggKey string) (Version, error)
    Set(ctx, projectionName, aggKey string, Version) error
}

aggCpStore := es.NewInMemAggCpStore()

Subscriptions

Subscribe to event streams:

type Stream interface {
    Subscribe(ctx context.Context, opts ...SubscribeOption) (Subscription, error)
}

type Subscription interface {
    Cancel()
    MaxSequence() uint64
    Chan() <-chan Envelope
}

Subscribe Options:

es.WithDeliverPolicy(es.DeliverAllPolicy)  // Replay all events
es.WithDeliverPolicy(es.DeliverNewPolicy)  // Only new events
es.WithStartSequence(1000)                 // Resume from sequence
es.WithFilters(es.SubscribeFilter{
    AggregateType: "user",
    AggregateID:   "user-123",
})

Projections

Projection Interface
type Projection interface {
    Name() string
    Handler  // Inherits Handle(msgCtx) error
}
SnapshotProjection

Automatic snapshotting for projections:

type UserIndex struct {
    ByID   map[string]*User
    ByName map[string]*User
}

func (p *UserIndex) Name() string { return "user-index" }

func (p *UserIndex) Handle(m es.MsgCtx) error {
    // Update index
    return nil
}

func (p *UserIndex) Snapshot() ([]byte, error) {
    return json.Marshal(p)
}

func (p *UserIndex) RestoreSnapshot(data []byte) error {
    return json.Unmarshal(data, p)
}

// Wrap with automatic snapshotting
proj, _ := es.NewSnapshotProjection(logger, &UserIndex{}, snapshotter,
    es.WithSnapshotFrequency(100),  // Snapshot every 100 events in live mode
)

consumer := env.NewConsumer(proj)

Environment

Factory for coordinated components:

env := es.NewEnv(
    es.WithStore(store),              // Custom store
    es.WithSnapshotter(snapshotter),  // Custom snapshotter
    es.WithInMemory(),                // Shorthand: in-memory everything
    es.WithLog(logger),
    es.WithCtx(ctx),
    es.WithEvent[NameChanged](),      // Register event types
    es.WithAggregates(&User{}),       // Register aggregates
    es.WithConsumer(handler, opts),   // Pre-configure consumer
    es.WithProjection(projection),    // Pre-configure projection
)

env.Start()  // Starts all consumers
defer env.Shutdown()

// Access components
repo := env.Repository()
store := env.Store()
consumer := env.NewConsumer(handler)

Metrics

The ES package supports pluggable metrics via the ESMetrics interface:

import promadapter "github.com/codewandler/clstr-go/adapters/prometheus"

// Create Prometheus metrics
metrics := promadapter.NewESMetrics(prometheus.DefaultRegisterer)

// Use with environment
env := es.NewEnv(
    es.WithMetrics(metrics),
    // ... other options
)

// Or with repository directly
repo := es.NewRepository(log, store, registry, es.WithMetrics(metrics))

// Or with consumer
consumer := es.NewConsumer(store, registry, handler, es.WithMetrics(metrics))

Available Metrics:

Metric Type Labels Description
clstr_es_store_load_duration_seconds Histogram aggregate_type Event store load latency
clstr_es_store_append_duration_seconds Histogram aggregate_type Event store append latency
clstr_es_events_appended_total Counter aggregate_type Events appended count
clstr_es_repo_load_duration_seconds Histogram aggregate_type Repository load latency
clstr_es_repo_save_duration_seconds Histogram aggregate_type Repository save latency
clstr_es_concurrency_conflicts_total Counter aggregate_type Optimistic lock failures
clstr_es_cache_hits_total Counter aggregate_type Cache hits
clstr_es_cache_misses_total Counter aggregate_type Cache misses
clstr_es_snapshot_load_duration_seconds Histogram aggregate_type Snapshot load latency
clstr_es_snapshot_save_duration_seconds Histogram aggregate_type Snapshot save latency
clstr_es_consumer_event_duration_seconds Histogram event_type, live Event processing time
clstr_es_consumer_events_total Counter event_type, live, success Events processed
clstr_es_consumer_lag Gauge consumer Consumer lag (sequences behind)

If no metrics are provided, a no-op implementation is used (zero overhead).

Error Types

es.ErrAggregateNotFound       // Aggregate doesn't exist
es.ErrConcurrencyConflict     // Version mismatch on save
es.ErrUnknownEventType        // Event type not registered
es.ErrStoreNoEvents           // No events to store
es.ErrSnapshotterUnconfigured // Snapshotter not set
es.ErrSnapshotNotFound        // No snapshot exists
es.ErrCheckpointNotFound      // No checkpoint exists

In-Memory Store

Full-featured implementation for testing:

store := es.NewInMemoryStore()

Features:

  • Thread-safe with mutex protection
  • Optimistic concurrency checking
  • Subscription support with live mode detection
  • Filtering by aggregate type/ID

Complete Example

package main

import (
    "context"
    "log/slog"

    "github.com/codewandler/clstr-go/core/es"
)

// Events
type NameChanged struct{ Name string }
type EmailChanged struct{ Email string }

// Aggregate
type User struct {
    es.BaseAggregate
    Name  string
    Email string
}

func (u *User) GetAggType() string { return "user" }

func (u *User) Register(r es.Registrar) {
    es.RegisterEvents(r, es.Event[NameChanged](), es.Event[EmailChanged]())
}

func (u *User) Apply(evt any) error {
    switch e := evt.(type) {
    case *es.AggregateCreatedEvent:
        return u.BaseAggregate.Apply(evt)
    case *NameChanged:
        u.Name = e.Name
    case *EmailChanged:
        u.Email = e.Email
    }
    return nil
}

func (u *User) ChangeName(name string) error {
    return es.RaiseAndApply(u, &NameChanged{Name: name})
}

func (u *User) ChangeEmail(email string) error {
    return es.RaiseAndApply(u, &EmailChanged{Email: email})
}

// Projection
type UserProjection struct {
    Users map[string]string // id -> name
}

func (p *UserProjection) Handle(m es.MsgCtx) error {
    switch e := m.Event().(type) {
    case *NameChanged:
        p.Users[m.AggregateID()] = e.Name
    }
    return nil
}

func main() {
    ctx := context.Background()
    logger := slog.Default()

    // Create environment
    env := es.NewEnv(
        es.WithInMemory(),
        es.WithLog(logger),
        es.WithAggregates(&User{}),
    )
    env.Start()
    defer env.Shutdown()

    // Create typed repository
    repo := es.NewTypedRepositoryFrom[*User](logger, env.Repository())

    // Create and modify aggregate
    user, _ := repo.Create(ctx, "user-123")
    user.ChangeName("Alice")
    user.ChangeEmail("alice@example.com")
    repo.Save(ctx, user, es.WithSnapshot(true))

    // Load aggregate
    loaded, _ := repo.GetByID(ctx, "user-123")
    slog.Info("loaded user", slog.String("name", loaded.Name))

    // Transactional update
    repo.WithTransaction(ctx, "user-123", func(u *User) error {
        return u.ChangeName("Bob")
    })

    // Create projection consumer
    proj := &UserProjection{Users: make(map[string]string)}
    consumer := env.NewConsumer(es.Handle(proj.Handle))
    go consumer.Start(ctx)

    // Query projection
    // proj.Users["user-123"] == "Bob"
}

Testing

func TestUser(t *testing.T) {
    env := es.NewEnv(es.WithInMemory(), es.WithAggregates(&User{}))
    env.Start()
    defer env.Shutdown()

    repo := es.NewTypedRepositoryFrom[*User](slog.Default(), env.Repository())

    user, err := repo.Create(t.Context(), "test-user")
    require.NoError(t, err)

    require.NoError(t, user.ChangeName("Test"))
    require.NoError(t, repo.Save(t.Context(), user))

    loaded, err := repo.GetByID(t.Context(), "test-user")
    require.NoError(t, err)
    require.Equal(t, "Test", loaded.Name)
}

Key Types Reference

Type Description
Aggregate Domain object interface
BaseAggregate Embeddable base implementation
Version Per-aggregate version number
Envelope Event storage container
EventStore Low-level event persistence
Repository Application-level aggregate operations
TypedRepository[T] Generic type-safe repository
Consumer Event stream processor
MsgCtx Event processing context
Handler Event handler interface
Snapshot Serialized aggregate state
Snapshotter Snapshot persistence
Projection Read model builder
Env Component factory

Documentation

Overview

Package es provides an event sourcing framework for building event-driven applications.

Overview

Event sourcing is a pattern where application state is stored as a sequence of events rather than as current state snapshots. This package provides the core abstractions and implementations for building event-sourced systems in Go.

Core Components

The package provides several key components:

Aggregate: The domain object that encapsulates business logic and state changes. Events are raised within aggregates and applied to update internal state. Use BaseAggregate as an embeddable helper that tracks version and uncommitted events.

type User struct {
    es.BaseAggregate
    Name  string
    Email string
}

func (u *User) ChangeName(name string) error {
    return es.RaiseAndApply(u, &NameChanged{Name: name})
}

EventStore: The persistence layer for events. It provides [EventStore.Load] to retrieve events for an aggregate and [EventStore.Append] to persist new events with optimistic concurrency control. Use NewInMemoryStore for testing or implement the interface for production storage (e.g., NATS JetStream via the adapters/nats package).

Repository: The application-level interface for working with aggregates. It handles loading aggregates by replaying events and saving new events. Use NewTypedRepository for type-safe operations with generics:

repo := es.NewTypedRepository[*User](log, store, registry)
user, err := repo.GetByID(ctx, "user-123")
user.ChangeName("New Name")
repo.Save(ctx, user)

Consumer: Processes events from the store for building read models or triggering side effects. Supports checkpointing for exactly-once semantics and live mode detection to distinguish historical replay from real-time events:

consumer := es.NewConsumer(store, registry, handler,
    es.WithConsumerName("user-projector"),
    es.WithMiddlewares(es.NewCheckpointMiddleware(checkpointStore)),
)
consumer.Start(ctx)

Event Registration

Events must be registered with an EventRegistry before they can be decoded:

registry := es.NewEventRegistry()
registry.Register(es.Event[NameChanged]())
registry.Register(es.Event[EmailChanged]())

Snapshots

For aggregates with many events, snapshots can optimize loading by capturing state at a point in time. Implement Snapshottable for custom serialization, or let the framework use JSON marshaling as a fallback:

// Load with snapshot optimization
user, err := repo.GetByID(ctx, "user-123", es.WithSnapshot(true))

// Save with snapshot
repo.Save(ctx, user, es.WithSnapshot(true))

Concurrency Control

The framework uses optimistic concurrency via the Version type. When saving, the repository checks that the aggregate's version matches the store's version. If another process has modified the aggregate, ErrConcurrencyConflict is returned.

For serialized access to a single aggregate, use [TypedRepository.WithTransaction]:

repo.WithTransaction(ctx, "user-123", func(user *User) error {
    return user.ChangeName("New Name")
})

Environment

The [Environment] type provides a factory for creating configured instances of stores, repositories, and consumers with shared configuration:

env := es.NewEnvironment(
    es.WithLog(logger),
    es.WithStore(natsStore),
    es.WithEvent[NameChanged](),
    es.WithAggregates(&User{}),
)
repo := env.Repository()
consumer := env.NewConsumer(handler)

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrAggregateNotFound   = errors.New("aggregate not found")
	ErrConcurrencyConflict = errors.New("concurrency conflict")
	ErrUnknownEventType    = errors.New("unknown event type")
)
View Source
var (
	ErrSnapshotterUnconfigured = errors.New("no snapshotter configured")
	ErrSnapshotNotFound        = errors.New("snapshot not found")
)
View Source
var (
	ErrCheckpointNotFound = errors.New("checkpoint not found")
)
View Source
var (
	ErrStoreNoEvents = errors.New("no events to store")
)

Functions

func ApplySnapshot

func ApplySnapshot(ctx context.Context, snapshotter Snapshotter, agg Aggregate) (err error)

func Event

func Event[T any]() func() any

Event returns a reflection-free constructor for an event of type T. Each call to the returned function constructs a fresh *T via new(T).

func RaiseAndApply

func RaiseAndApply(a raiseApplier, events ...any) (err error)

RaiseAndApply records e as uncommitted and applies it to mutate state.

func RaiseAndApplyD

func RaiseAndApplyD(a Aggregate, events ...any) func() error

RaiseAndApplyD returns a deferred function that calls RaiseAndApply. This is useful with BaseAggregate.Checked for conditional event application:

return b.Checked(
    assert.NotEmpty(user.Email, "email required"),
    es.RaiseAndApplyD(user, &EmailChanged{Email: newEmail}),
)

func RegisterEventFor

func RegisterEventFor[T any](r Registrar)

func RegisterEvents

func RegisterEvents(r Registrar, ctors ...func() any)

RegisterEvents registers event constructors. It does not use reflection to create instances. For each provided constructor, we call it once to determine the event type name and then register the original constructor so future decodes produce fresh instances per call.

Types

type AggCpStore

type AggCpStore interface {
	Get(ctx context.Context, projectionName, aggKey string) (lastVersion Version, err error)
	Set(ctx context.Context, projectionName, aggKey string, lastVersion Version) error
}

func NewNoopCpStore

func NewNoopCpStore() AggCpStore

type AggCpStoreOption

type AggCpStoreOption valueOption[AggCpStore]

func WithAggregateCheckpointStore

func WithAggregateCheckpointStore(cps AggCpStore) AggCpStoreOption

type Aggregate

type Aggregate interface {
	// GetAggType returns the aggregate type name used for stream identification.
	GetAggType() string
	// GetID returns the unique identifier of this aggregate instance.
	GetID() string
	// SetID sets the aggregate ID. Typically called during creation.
	SetID(string)

	// GetVersion returns the current version (number of events applied).
	GetVersion() Version

	// GetSeq returns the global stream sequence of the last applied event.
	GetSeq() uint64

	// Create initializes a new aggregate with the given ID.
	Create(id string) error

	// Register registers event types with the provided Registrar.
	Register(r Registrar)
	// Raise records an event as uncommitted without applying it.
	Raise(event any)
	// Apply updates the aggregate state from an event.
	Apply(event any) error

	// Uncommitted returns a copy of events raised but not yet persisted.
	Uncommitted() []any
	// ClearUncommitted removes all uncommitted events after successful save.
	ClearUncommitted()
	// contains filtered or unexported methods
}

Aggregate is the core interface for event-sourced domain objects. It defines the contract that all aggregate roots must implement to work with the Repository for loading and persisting state through events.

An aggregate maintains:

  • Identity: type and ID that uniquely identify the aggregate stream
  • Version: the current version for optimistic concurrency control
  • Sequence: the global stream sequence number of the last applied event
  • Uncommitted events: events raised but not yet persisted

The typical lifecycle is:

  1. Create a new aggregate or load an existing one via Repository
  2. Execute domain logic that calls Raise() to record events
  3. Apply() is called to update internal state from each event
  4. Save via Repository which persists uncommitted events and calls ClearUncommitted()

type AggregateCreatedEvent

type AggregateCreatedEvent struct {
	ID        string    `json:"id"`
	CreatedAt time.Time `json:"created_at"`
}

func (AggregateCreatedEvent) Validate

func (e AggregateCreatedEvent) Validate() error

type AggregateDeleted

type AggregateDeleted struct{}

AggregateDeleted is an event that marks an aggregate as deleted. Use this as a soft-delete marker that can be detected by projections.

type AggregateOption

type AggregateOption struct {
	// contains filtered or unexported fields
}

func WithAggregates

func WithAggregates(a ...Aggregate) AggregateOption

type Applier

type Applier interface {
	Apply(event any) error
}

Applier is the interface for types that can apply events to update their state.

type BaseAggregate

type BaseAggregate struct {
	CreatedAt time.Time `json:"created_at"`
	// contains filtered or unexported fields
}

BaseAggregate is an embeddable helper that tracks version + uncommitted events.

func (*BaseAggregate) Apply

func (b *BaseAggregate) Apply(evt any) error

func (*BaseAggregate) Checked

func (b *BaseAggregate) Checked(c assert.Cond, thenFunc func() error) error

func (*BaseAggregate) ClearUncommitted

func (b *BaseAggregate) ClearUncommitted()

func (*BaseAggregate) Create

func (b *BaseAggregate) Create(id string) error

func (*BaseAggregate) GetCreatedAt

func (b *BaseAggregate) GetCreatedAt() time.Time

func (*BaseAggregate) GetID

func (b *BaseAggregate) GetID() string

func (*BaseAggregate) GetSeq

func (b *BaseAggregate) GetSeq() uint64

func (*BaseAggregate) GetVersion

func (b *BaseAggregate) GetVersion() Version

func (*BaseAggregate) IsCreated

func (b *BaseAggregate) IsCreated() bool

func (*BaseAggregate) Raise

func (b *BaseAggregate) Raise(event any)

Raise records an event as uncommitted. (Typically you call Raise+Apply together via a helper like ApplyNew below.)

func (*BaseAggregate) SetID

func (b *BaseAggregate) SetID(id string)

func (*BaseAggregate) Uncommitted

func (b *BaseAggregate) Uncommitted() []any

type Checkpoint

type Checkpoint interface {
	// GetLastSeq returns the sequence number of the last successfully processed event.
	// Returns ErrCheckpointNotFound if no checkpoint exists.
	GetLastSeq() (uint64, error)
}

Checkpoint is implemented by handlers that track their processing progress. When a handler implements this interface, the Consumer will use GetLastSeq() to determine where to resume processing after a restart.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer processes events from an EventStore by subscribing to the stream and dispatching events to a Handler. It supports checkpointing for exactly-once processing semantics and live mode detection for distinguishing between historical replay and real-time events.

func NewConsumer

func NewConsumer(
	store EventStore,
	decoder Decoder,
	handler Handler,
	opts ...ConsumerOption,
) *Consumer

func (*Consumer) Start

func (c *Consumer) Start(ctx context.Context) error

func (*Consumer) Stop

func (c *Consumer) Stop()

type ConsumerNameOption

type ConsumerNameOption valueOption[string]

func WithConsumerName

func WithConsumerName(name string) ConsumerNameOption

type ConsumerOption

type ConsumerOption interface {
	// contains filtered or unexported methods
}

type ConsumerOptions

type ConsumerOptions MultiOption[ConsumerOption]

func WithConsumerOpts

func WithConsumerOpts(opts ...ConsumerOption) ConsumerOptions

type ConsumerShutdownTimeoutOption

type ConsumerShutdownTimeoutOption valueOption[time.Duration]

func WithShutdownTimeout

func WithShutdownTimeout(d time.Duration) ConsumerShutdownTimeoutOption

WithShutdownTimeout sets the timeout for handler shutdown when the consumer stops. Default is 5 seconds.

type ContextOption

type ContextOption struct {
	// contains filtered or unexported fields
}

func WithCtx

func WithCtx(ctx context.Context) ContextOption

type CpStore

type CpStore interface {
	Get(ctx context.Context) (lastSeq uint64, err error)
	Set(ctx context.Context, lastSeq uint64) error
}

type CpStoreOption

type CpStoreOption valueOption[CpStore]

func WithCheckpointStore

func WithCheckpointStore(cps CpStore) CpStoreOption

type Decoder

type Decoder interface{ Decode(e Envelope) (any, error) }

type DeliverPolicy

type DeliverPolicy string
const (
	DeliverAllPolicy DeliverPolicy = "all"
	DeliverNewPolicy DeliverPolicy = "new"
)

type ESMetrics

type ESMetrics interface {
	// Store operations
	StoreLoadDuration(aggType string) metrics.Timer
	StoreAppendDuration(aggType string) metrics.Timer
	EventsAppended(aggType string, count int)

	// Repository operations
	RepoLoadDuration(aggType string) metrics.Timer
	RepoSaveDuration(aggType string) metrics.Timer
	ConcurrencyConflict(aggType string)

	// Cache
	CacheHit(aggType string)
	CacheMiss(aggType string)

	// Snapshots
	SnapshotLoadDuration(aggType string) metrics.Timer
	SnapshotSaveDuration(aggType string) metrics.Timer

	// Consumer
	ConsumerEventDuration(eventType string, live bool) metrics.Timer
	ConsumerEventProcessed(eventType string, live bool, success bool)
	ConsumerLag(consumer string, lag int64)
}

ESMetrics defines the metrics interface for the Event Sourcing pillar. All methods return Timer or increment counters; implementations should be thread-safe.

func NopESMetrics

func NopESMetrics() ESMetrics

NopESMetrics returns a no-op ESMetrics implementation.

type ESMetricsOption

type ESMetricsOption struct {
	// contains filtered or unexported fields
}

ESMetricsOption sets the metrics for ES components.

func WithMetrics

func WithMetrics(m ESMetrics) ESMetricsOption

WithMetrics sets the metrics implementation for ES components.

type Env

type Env struct {
	// contains filtered or unexported fields
}

func NewEnv

func NewEnv(opts ...EnvOption) (e *Env)

func (*Env) Append

func (e *Env) Append(ctx context.Context, aggType string, aggID string, expect Version, events ...any) error

func (*Env) NewConsumer

func (e *Env) NewConsumer(handler Handler, opts ...ConsumerOption) *Consumer

func (*Env) Repository

func (e *Env) Repository() Repository

func (*Env) Shutdown

func (e *Env) Shutdown()

func (*Env) Snapshotter

func (e *Env) Snapshotter() Snapshotter

func (*Env) Start

func (e *Env) Start() (err error)

func (*Env) Store

func (e *Env) Store() EventStore

type EnvConsumerOption

type EnvConsumerOption struct {
	// contains filtered or unexported fields
}

func WithConsumer

func WithConsumer(handler Handler, opts ...ConsumerOption) EnvConsumerOption

func WithProjection

func WithProjection(projection Projection, opts ...ConsumerOption) EnvConsumerOption

type EnvOption

type EnvOption interface {
	// contains filtered or unexported methods
}

type EnvOpts

type EnvOpts MultiOption[EnvOption]

func WithEnvOpts

func WithEnvOpts(opts ...EnvOption) EnvOpts

type Envelope

type Envelope struct {
	// ID is the unique identifier of this event envelope.
	ID string `json:"id"`
	// Seq is the global sequence number assigned by the store.
	// This provides total ordering across all events in the store.
	Seq uint64 `json:"seq"`
	// Version is the per-aggregate stream version (1, 2, 3, ...).
	// Used for optimistic concurrency control.
	Version Version `json:"version"`
	// AggregateType identifies the type of aggregate this event belongs to.
	AggregateType string `json:"aggregate"`
	// AggregateID identifies the specific aggregate instance.
	AggregateID string `json:"aggregate_id"`
	// Type is the event type name for deserialization routing.
	Type string `json:"type"`
	// OccurredAt is when the event was created.
	OccurredAt time.Time `json:"occurred_at"`
	// Data contains the JSON-encoded event payload.
	Data json.RawMessage `json:"data"`
}

Envelope wraps an event with metadata for persistence and routing. It is the unit of storage in the EventStore and contains all information needed to reconstruct and route events during replay or consumption.

func (Envelope) Validate

func (e Envelope) Validate() error

type EventRegisterOption

type EventRegisterOption struct {
	// contains filtered or unexported fields
}

func WithEvent

func WithEvent[T any]() EventRegisterOption

type EventRegistry

type EventRegistry struct {
	// contains filtered or unexported fields
}

EventRegistry maps event type names to constructors so we can decode persisted events.

func NewRegistry

func NewRegistry() *EventRegistry

func (*EventRegistry) Decode

func (r *EventRegistry) Decode(env Envelope) (any, error)

func (*EventRegistry) Register

func (r *EventRegistry) Register(eventType string, ctor func() any)

type EventStore

type EventStore interface {
	// Stream provides subscription capabilities for consumers.
	Stream

	// Load retrieves events for a specific aggregate stream.
	// Events are returned in version order. Use opts to filter by version/sequence.
	Load(ctx context.Context, aggType string, aggID string, opts ...StoreLoadOption) ([]Envelope, error)

	// Append persists events to an aggregate stream with optimistic concurrency.
	// Returns ErrConcurrencyConflict if expectedVersion doesn't match the current version.
	Append(ctx context.Context, aggType string, aggID string, expectedVersion Version, events []Envelope) (*StoreAppendResult, error)
}

EventStore is the persistence interface for event sourcing. It provides operations for loading and appending events to aggregate streams, as well as subscribing to the global event stream for consumers.

Implementations must guarantee:

  • Atomic append of multiple events within a single Append call
  • Optimistic concurrency via expectedVersion check
  • Ordered delivery of events by sequence number in subscriptions

type HandleFunc

type HandleFunc func(ctx MsgCtx) error

func Handle

func Handle(f HandleFunc) HandleFunc

func (HandleFunc) Handle

func (f HandleFunc) Handle(ctx MsgCtx) error

type Handler

type Handler interface{ Handle(msgCtx MsgCtx) error }

type HandlerLifecycleShutdown

type HandlerLifecycleShutdown interface {
	Shutdown(ctx context.Context) error
}

type HandlerLifecycleStart

type HandlerLifecycleStart interface {
	Start(ctx context.Context) error
}

type HandlerMiddleware

type HandlerMiddleware func(next Handler) Handler

func NewCheckpointMiddleware

func NewCheckpointMiddleware(cp CpStore) HandlerMiddleware

func NewLogMiddleware

func NewLogMiddleware(attrs ...any) HandlerMiddleware

type IDGenerator

type IDGenerator func() string

IDGenerator is a function that generates unique IDs for events.

func DefaultIDGenerator

func DefaultIDGenerator() IDGenerator

DefaultIDGenerator returns the default ID generator using nanoid.

type InMemAggCpStore

type InMemAggCpStore struct {
	// contains filtered or unexported fields
}

func NewInMemAggCpStore

func NewInMemAggCpStore() *InMemAggCpStore

func (*InMemAggCpStore) Get

func (s *InMemAggCpStore) Get(_ context.Context, proj, aggID string) (Version, error)

func (*InMemAggCpStore) Set

func (s *InMemAggCpStore) Set(_ context.Context, proj, aggID string, v Version) error

type InMemCpStore

type InMemCpStore struct {
	// contains filtered or unexported fields
}

func NewInMemCpStore

func NewInMemCpStore() *InMemCpStore

func (*InMemCpStore) Get

func (s *InMemCpStore) Get(_ context.Context) (uint64, error)

func (*InMemCpStore) Set

func (s *InMemCpStore) Set(_ context.Context, v uint64) error

type InMemoryStore

type InMemoryStore struct {
	// contains filtered or unexported fields
}

InMemoryStore is a simple, correct (optimistic) store for tests/dev.

func NewInMemoryStore

func NewInMemoryStore() *InMemoryStore

func (*InMemoryStore) Append

func (s *InMemoryStore) Append(
	_ context.Context,
	aggType string,
	aggID string,
	expectVersion Version,
	events []Envelope,
) (*StoreAppendResult, error)

func (*InMemoryStore) Load

func (s *InMemoryStore) Load(
	_ context.Context,
	aggType,
	aggID string,
	opts ...StoreLoadOption,
) ([]Envelope, error)

func (*InMemoryStore) Subscribe

func (s *InMemoryStore) Subscribe(ctx context.Context, opts ...SubscribeOption) (Subscription, error)

type KeyValueSnapshotter

type KeyValueSnapshotter struct {
	// contains filtered or unexported fields
}

func NewInMemorySnapshotter

func NewInMemorySnapshotter() *KeyValueSnapshotter

func NewKeyValueSnapshotter

func NewKeyValueSnapshotter(store kv.Store) *KeyValueSnapshotter

func (*KeyValueSnapshotter) LoadSnapshot

func (k *KeyValueSnapshotter) LoadSnapshot(ctx context.Context, objType, objID string) (snap Snapshot, err error)

func (*KeyValueSnapshotter) SaveSnapshot

func (k *KeyValueSnapshotter) SaveSnapshot(ctx context.Context, snapshot Snapshot, opts SnapshotSaveOpts) error

type LoadAndSaveOption

type LoadAndSaveOption interface {
	// contains filtered or unexported methods
}

type LoadOption

type LoadOption interface {
	// contains filtered or unexported methods
}

type LoadOptsOption

type LoadOptsOption MultiOption[LoadOption]

func WithLoadOpts

func WithLoadOpts(opts ...LoadOption) LoadOptsOption

type LogOption

type LogOption struct {
	// contains filtered or unexported fields
}

func WithLog

func WithLog(l *slog.Logger) LogOption

type MemoryOption

type MemoryOption struct{}

func WithInMemory

func WithInMemory() MemoryOption

type MiddlewareHandleFunc

type MiddlewareHandleFunc func(ctx MsgCtx, next Handler) error

type MiddlewareOption

type MiddlewareOption valueOption[[]HandlerMiddleware]

func WithMiddlewares

func WithMiddlewares(mws ...HandlerMiddleware) MiddlewareOption

func WithMiddlewaresAppend

func WithMiddlewaresAppend(mws ...HandlerMiddleware) MiddlewareOption

type MsgCtx

type MsgCtx struct {
	// contains filtered or unexported fields
}

MsgCtx provides context for handling a single event. It wraps the event envelope with additional metadata about the processing context, including whether the consumer is in live mode (processing real-time events) or catching up on historical events.

func (*MsgCtx) AggregateID

func (c *MsgCtx) AggregateID() string

func (*MsgCtx) AggregateType

func (c *MsgCtx) AggregateType() string

func (*MsgCtx) Context

func (c *MsgCtx) Context() context.Context

func (*MsgCtx) Data

func (c *MsgCtx) Data() json.RawMessage

func (*MsgCtx) Envelope

func (c *MsgCtx) Envelope() Envelope

func (*MsgCtx) Event

func (c *MsgCtx) Event() any

func (*MsgCtx) Live

func (c *MsgCtx) Live() bool

func (*MsgCtx) Log

func (c *MsgCtx) Log() *slog.Logger

func (*MsgCtx) OccurredAt

func (c *MsgCtx) OccurredAt() time.Time

func (*MsgCtx) Seq

func (c *MsgCtx) Seq() uint64

func (*MsgCtx) Type

func (c *MsgCtx) Type() string

func (*MsgCtx) Version

func (c *MsgCtx) Version() Version

type MultiOption

type MultiOption[T any] struct {
	// contains filtered or unexported fields
}

type NoopAggCpStore

type NoopAggCpStore struct{}

func (NoopAggCpStore) Get

func (NoopAggCpStore) Set

type Projection

type Projection interface {
	Name() string
	Handler
}

Projection consumes persisted events to build read models / indexes.

type Registrar

type Registrar interface {
	Register(eventType string, ctor func() any)
}

type RepoCacheOption

type RepoCacheOption valueOption[cache.Cache]

func WithRepoCache

func WithRepoCache(cache cache.Cache) RepoCacheOption

func WithRepoCacheLRU

func WithRepoCacheLRU(size int) RepoCacheOption

type RepoCreateOption

type RepoCreateOption valueOption[bool]

func WithCreate

func WithCreate() RepoCreateOption

type RepoIDGeneratorOption

type RepoIDGeneratorOption valueOption[IDGenerator]

func WithIDGenerator

func WithIDGenerator(gen IDGenerator) RepoIDGeneratorOption

WithIDGenerator sets a custom ID generator for event envelope IDs.

type RepoUseCacheOption

type RepoUseCacheOption valueOption[bool]

func WithUseCache

func WithUseCache(useCache bool) RepoUseCacheOption

type Repository

type Repository interface {
	Load(ctx context.Context, agg Aggregate, opts ...LoadOption) error
	Save(ctx context.Context, agg Aggregate, opts ...SaveOption) error
	CreateSnapshot(ctx context.Context, agg Aggregate, saveSnapshotOpts SnapshotSaveOpts) (Snapshot, error)
}

func NewRepository

func NewRepository(
	log *slog.Logger,
	store EventStore,
	registry *EventRegistry,
	opts ...RepositoryOption,
) Repository

type RepositoryOption

type RepositoryOption interface {
	// contains filtered or unexported methods
}

type SaveOption

type SaveOption interface {
	// contains filtered or unexported methods
}

type SaveOptsOption

type SaveOptsOption MultiOption[SaveOption]

func WithSaveOpts

func WithSaveOpts(opts ...SaveOption) SaveOptsOption

type Snapshot

type Snapshot struct {
	// SnapshotID is the unique identifier of this snapshot.
	SnapshotID string `json:"snapshot_id"`

	// ObjID is the identifier of the snapshotted object (aggregate ID or projection name).
	ObjID string `json:"obj_id"`
	// ObjType is the type of the snapshotted object (aggregate type or "projection").
	ObjType string `json:"obj_type"`
	// ObjVersion is the version of the object at the time of snapshot.
	ObjVersion Version `json:"obj_version"`

	// StreamSeq is the global stream sequence number at the time of snapshot.
	// When restoring, events after this sequence are replayed.
	StreamSeq uint64 `json:"stream_seq"`

	// CreatedAt is when this snapshot was created.
	CreatedAt time.Time `json:"created_at"`
	// SchemaVersion tracks the snapshot format for migrations.
	SchemaVersion int `json:"schema_version"`
	// Encoding indicates how Data is encoded (typically "json").
	Encoding string `json:"encoding"`
	// Data contains the serialized state.
	Data []byte `json:"data"`
}

Snapshot represents a point-in-time capture of an aggregate or projection state. Snapshots optimize loading by allowing the system to restore state directly instead of replaying all events from the beginning.

func CreateSnapshot

func CreateSnapshot(agg Aggregate) (ss Snapshot, err error)

func LoadSnapshot

func LoadSnapshot(
	ctx context.Context,
	snapshotter Snapshotter,
	aggType, aggID string,
) (ss Snapshot, err error)

type SnapshotOption

type SnapshotOption valueOption[bool]

func WithSnapshot

func WithSnapshot(b bool) SnapshotOption

type SnapshotProjection

type SnapshotProjection[T SnapshottableProjection] struct {
	// contains filtered or unexported fields
}

SnapshotProjection wraps a SnapshottableProjection and provides automatic snapshotting at configurable intervals during live mode.

func NewSnapshotProjection

func NewSnapshotProjection[T SnapshottableProjection](
	log *slog.Logger,
	innerProjection T,
	snapshotter Snapshotter,
	opts ...SnapshotProjectionOption,
) (*SnapshotProjection[T], error)

func (*SnapshotProjection[T]) GetLastSeq

func (p *SnapshotProjection[T]) GetLastSeq() (uint64, error)

func (*SnapshotProjection[T]) Handle

func (p *SnapshotProjection[T]) Handle(msgCtx MsgCtx) error

func (*SnapshotProjection[T]) Name

func (p *SnapshotProjection[T]) Name() string

func (*SnapshotProjection[T]) Projection

func (p *SnapshotProjection[T]) Projection() T

func (*SnapshotProjection[T]) Shutdown

func (p *SnapshotProjection[T]) Shutdown(ctx context.Context) error

func (*SnapshotProjection[T]) Start

func (p *SnapshotProjection[T]) Start(ctx context.Context) error

type SnapshotProjectionOption

type SnapshotProjectionOption interface {
	// contains filtered or unexported methods
}

SnapshotProjectionOption configures a SnapshotProjection.

func WithSnapshotFrequency

func WithSnapshotFrequency(n uint64) SnapshotProjectionOption

WithSnapshotFrequency sets how often (in events) snapshots are taken in live mode. Default is 10 events.

type SnapshotSaveOpts

type SnapshotSaveOpts struct {
	// TTL sets the time-to-live for the snapshot. Zero means no expiration.
	TTL time.Duration
}

SnapshotSaveOpts configures snapshot persistence.

type SnapshotTTLOption

type SnapshotTTLOption valueOption[time.Duration]

func WithSnapshotTTL

func WithSnapshotTTL(ttl time.Duration) SnapshotTTLOption

type Snapshottable

type Snapshottable interface {
	// Snapshot serializes the current state to bytes.
	Snapshot() (data []byte, err error)
	// RestoreSnapshot restores state from previously serialized bytes.
	RestoreSnapshot(data []byte) error
}

Snapshottable is implemented by types that support custom snapshot serialization. If not implemented, JSON marshaling is used as a fallback.

type SnapshottableProjection

type SnapshottableProjection interface {
	Projection
	Snapshottable
}

type Snapshotter

type Snapshotter interface {
	// SaveSnapshot persists a snapshot with optional TTL.
	SaveSnapshot(ctx context.Context, snapshot Snapshot, opts SnapshotSaveOpts) error
	// LoadSnapshot retrieves the latest snapshot for an object.
	// Returns ErrSnapshotNotFound if no snapshot exists.
	LoadSnapshot(ctx context.Context, objType, objID string) (Snapshot, error)
}

Snapshotter provides storage operations for snapshots.

type SnapshotterOption

type SnapshotterOption valueOption[Snapshotter]

func WithSnapshotter

func WithSnapshotter(s Snapshotter) SnapshotterOption

type StartSeqOption

type StartSeqOption valueOption[uint64]

func WithStartSeq

func WithStartSeq(startSeq uint64) StartSeqOption

func (StartSeqOption) ApplyToStoreLoadOptions

func (o StartSeqOption) ApplyToStoreLoadOptions(receiver storeLoadOptionsReceiver)

type StoreAppendResult

type StoreAppendResult struct {
	// LastSeq is the global sequence number assigned to the last appended event.
	LastSeq uint64
}

StoreAppendResult contains the result of appending events to the store.

func AppendEvents

func AppendEvents(
	ctx context.Context,
	store EventStore,
	aggType string,
	aggID string,
	expect Version,
	events ...any,
) (*StoreAppendResult, error)

type StoreLoadOption

type StoreLoadOption interface {
	ApplyToStoreLoadOptions(storeLoadOptionsReceiver)
}

func WithStartAtVersion

func WithStartAtVersion(startVersion Version) StoreLoadOption

type StoreOption

type StoreOption valueOption[EventStore]

func WithStore

func WithStore(s EventStore) StoreOption

type Stream

type Stream interface {
	Subscribe(ctx context.Context, opts ...SubscribeOption) (Subscription, error)
}

type SubscribeFilter

type SubscribeFilter struct {
	AggregateType string
	AggregateID   string
}

type SubscribeOption

type SubscribeOption func(opts *SubscribeOpts)

func WithDeliverPolicy

func WithDeliverPolicy(policy DeliverPolicy) SubscribeOption

func WithFilters

func WithFilters(filters ...SubscribeFilter) SubscribeOption

func WithStartSequence

func WithStartSequence(startSequence uint64) SubscribeOption

type SubscribeOpts

type SubscribeOpts struct {
	// contains filtered or unexported fields
}

func NewSubscribeOpts

func NewSubscribeOpts(opts ...SubscribeOption) SubscribeOpts

func (*SubscribeOpts) DeliverPolicy

func (s *SubscribeOpts) DeliverPolicy() DeliverPolicy

func (*SubscribeOpts) Filters

func (s *SubscribeOpts) Filters() []SubscribeFilter

func (*SubscribeOpts) StartSequence

func (s *SubscribeOpts) StartSequence() uint64

type Subscription

type Subscription interface {
	Cancel()
	MaxSequence() uint64
	Chan() <-chan Envelope
}

type TestingEnv

type TestingEnv struct {
	*Env
	// contains filtered or unexported fields
}

func StartTestEnv

func StartTestEnv(
	t *testing.T,
	opts ...EnvOption,
) *TestingEnv

func (*TestingEnv) Assert

func (e *TestingEnv) Assert() *TestingEnvAssert

type TestingEnvAssert

type TestingEnvAssert struct {
	// contains filtered or unexported fields
}

func (*TestingEnvAssert) Append

func (t *TestingEnvAssert) Append(
	ctx context.Context,
	expect Version,
	aggType string,
	aggID string,
	events ...any,
)

type TypedRepository

type TypedRepository[T Aggregate] interface {
	GetAggType() string
	New() T
	NewWithID(id string) T
	Create(ctx context.Context, aggID string, opts ...SaveOption) (agg T, err error)
	GetOrCreate(ctx context.Context, aggID string, opts ...LoadAndSaveOption) (agg T, err error)

	// GetByID gets an aggregate by ID. If the aggregate does not exist, it is created.
	GetByID(ctx context.Context, aggID string, opts ...LoadOption) (T, error)

	WithTransaction(ctx context.Context, aggID string, do func(T) error, opts ...WithTransactionOption) error

	Save(ctx context.Context, agg T, opts ...SaveOption) error
}

func NewTypedRepository

func NewTypedRepository[T Aggregate](log *slog.Logger, s EventStore, reg *EventRegistry, opts ...RepositoryOption) TypedRepository[T]

func NewTypedRepositoryFrom

func NewTypedRepositoryFrom[T Aggregate](log *slog.Logger, r Repository, opts ...RepositoryOption) TypedRepository[T]

type Version

type Version uint64

Version represents the version number of an aggregate within its stream. It is a monotonically increasing value starting from 1 for the first event. Version is used for optimistic concurrency control - when saving changes, the expected version must match the current version in the store.

func (Version) SlogAttr

func (v Version) SlogAttr() slog.Attr

func (Version) SlogAttrWithKey

func (v Version) SlogAttrWithKey(key string) slog.Attr

func (Version) Uint64

func (v Version) Uint64() uint64

type WithTransactionOption

type WithTransactionOption interface {
	// contains filtered or unexported methods
}

Directories

Path Synopsis
estests

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL