aggregate

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2025 License: BSD-3-Clause Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrRootNotFound = errors.New("root not found")

Functions

func ApplyReadTransformers added in v0.5.0

func ApplyReadTransformers[E event.Any](
	ctx context.Context,
	events []E,
	transformers []event.Transformer[E],
) ([]E, error)

func ApplyWriteTransformers added in v0.5.0

func ApplyWriteTransformers[E event.Any](
	ctx context.Context,
	events []E,
	transformers []event.Transformer[E],
) ([]E, error)

func LoadFromRecords

func LoadFromRecords[TID ID, E event.Any](
	ctx context.Context,
	root Root[TID, E],
	registry event.Registry[E],
	decoder encoding.Decoder,
	transformers []event.Transformer[E],
	records event.Records,
) error

LoadFromRecords hydrates an aggregate root from an iterator of event records. For each record, it decodes the data into a concrete event type, applies any transformations, and then applies the event to the root.

Usage (used by `ReadAndLoadFromStore` and snapshot loading logic):

err := aggregate.LoadFromRecords(ctx, root, registry, ser, transformers, records)

Returns an error if decoding, transformation, or application of an event fails.

func LoadFromSnapshot

func LoadFromSnapshot[TID ID, E event.Any, R Root[TID, E], TS Snapshot[TID]](
	ctx context.Context,
	store SnapshotStore[TID, TS],
	snapshotter Snapshotter[TID, E, R, TS],
	aggregateID TID,
) (R, bool, error)

LoadFromSnapshot orchestrates the retrieval and rehydration of an aggregate from a snapshot. It fetches the latest snapshot from the store and uses the snapshotter to convert it back into a live aggregate root instance, with its version correctly set.

Usage:

// Typically used within a repository's Get method.
root, found, err := aggregate.LoadFromSnapshot(
    ctx,
    snapshotStore,
    snapshotter,
    aggregateID,
)
if err != nil {
    return nil, err
}
if found {
    // The aggregate is partially loaded from the snapshot.
    // Now, load subsequent events from the event log.
} else {
    // No snapshot found, load from the beginning of the event log.
}

Returns the rehydrated aggregate root, a boolean indicating if a snapshot was found, and any error that occurred during the process.

func RawEventsFromUncommitted

func RawEventsFromUncommitted[E event.Any](
	ctx context.Context,
	encoder encoding.Encoder,
	transformers []event.Transformer[E],
	uncommitted UncommittedEvents[E],
) ([]event.Raw, error)

RawEventsFromUncommitted converts a slice of strongly-typed uncommitted events into a slice of `event.Raw` events. It applies write-side transformers and encodes the event data during this process.

Usage (called by `CommitEvents`):

rawEvents, err := aggregate.RawEventsFromUncommitted(ctx, encoder, transformers, uncommitted)

Returns a slice of `event.Raw` ready for storage, or an error if transformation or encoding fails.

func ReadAndLoadFromStore

func ReadAndLoadFromStore[TID ID, E event.Any](
	ctx context.Context,
	root Root[TID, E],
	store event.Log,
	registry event.Registry[E],
	decoder encoding.Decoder,
	transformers []event.Transformer[E],
	id TID,
	selector version.Selector,
) error

ReadAndLoadFromStore is a framework helper that orchestrates loading an aggregate. It reads event records from the event store and uses them to hydrate a new instance of an aggregate root.

Usage (typically inside a repository's Get method):

err := aggregate.ReadAndLoadFromStore(ctx, root, store, registry, ser, transformers, id, selector)

Returns an error if reading from the store, decoding, or applying any event fails. Returns an error if the aggregate is not found (i.e., has no events).

func RecordEvent

func RecordEvent[TID ID, E event.Any](root Root[TID, E], e E) error

RecordEvent is the primary function for recording a new event against an aggregate. It first applies the event to the aggregate to update its state, and upon success, adds the event to an internal list of uncommitted events to be persisted later.

A recommended pattern is to create a private, type-safe wrapper for this function on your aggregate. This improves type safety and autocompletion within your command methods.

Usage:

// 1. Define a private helper method on your aggregate that calls RecordEvent.
//    Notice the parameter is `AccountEvent`, your specific event sum type.
func (a *Account) recordThat(event AccountEvent) error {
	return aggregate.RecordEvent(a, event)
}

// 2. Use this helper in your command methods. The Go compiler will now ensure
//    you only pass valid AccountEvent types.
func (a *Account) DepositMoney(amount int) error {
	if amount <= 0 {
		return errors.New("amount must be positive")
	}
	return a.recordThat(&moneyDeposited{Amount: amount})
}

Returns an error if the aggregate's `Apply` method returns an error.

func RecordEvents

func RecordEvents[TID ID, E event.Any](root Root[TID, E], events ...E) error

RecordEvents is a convenience function to record multiple events in a single call. Each event is applied and recorded sequentially.

Usage (inside a command method on your aggregate):

return aggregate.RecordEvents(a, &eventOne{}, &eventTwo{})

Returns an error if any of the `Apply` calls fail.

func SnapPolicyFor added in v0.5.0

func SnapPolicyFor[R Root[TID, E], TID ID, E event.Any]() *policyBuilder[TID, E, R]

SnapPolicyFor provides a fluent builder for creating common snapshot policies.

Usage:

// Create a policy that snapshots every 10 events.
policy :=  aggregate.SnapPolicyFor[*account.Account]().EveryNEvents(10)

// Create a composite policy.
policy2 := aggregate.SnapPolicyFor[*account.Account]().AnyOf(...)

Types

type Aggregate

type Aggregate[TID ID, E event.Any] interface {
	Apply(E) error
	IDer[TID]
}

Aggregate defines the core behavior of an event-sourced aggregate. It must be able to change its state by applying an event and must provide its unique ID.

type AggregateLoader

type AggregateLoader[TID ID, E event.Any, R Root[TID, E]] interface {
	LoadAggregate(
		ctx context.Context,
		root R,
		id TID,
		selector version.Selector,
	) error
}

AggregateLoader is an interface for loading events and applying them to an *existing* instance of an aggregate root. This is primarily used by the snapshotting mechanism, which first creates an aggregate from a snapshot, then uses this interface to load and apply only the new events that occurred after the snapshot was taken.

Usage (internal, used by repository with snapshots):

// repo implements AggregateLoader
// root is an aggregate created from a snapshot at version 10
err := repo.LoadAggregate(ctx, root, "id-123", version.Selector{From: 11})

Returns an error if loading or applying subsequent events fails.

type Base

type Base struct {
	// contains filtered or unexported fields
}

Base is a struct that should be embedded in your aggregate root. It provides the fundamental mechanisms for versioning and tracking uncommitted events. By embedding Base, your aggregate automatically satisfies several required interfaces for the event sourcing framework.

Usage:

type MyAggregate struct {
    aggregate.Base
    // ... other fields
}

func (*Base) Version

func (br *Base) Version() version.Version

Version returns the current version of the aggregate. The version starts at 0 and is incremented for each new event recorded.

Returns the aggregate's current version number.

type CommittedEvents

type CommittedEvents[E event.Any] []E

CommittedEvents represents a strongly-typed slice of events that have been successfully persisted to the event log.

func CommitEvents

func CommitEvents[TID ID, E event.Any, R Root[TID, E]](
	ctx context.Context,
	store event.Log,
	encoder encoding.Encoder,
	transformers []event.Transformer[E],
	root R,
) (version.Version, CommittedEvents[E], error)

CommitEvents orchestrates the process of persisting an aggregate's pending changes. It is a reusable helper for implementing a repository's `Save` method. It flushes events, calculates the expected version, prepares them for storage, and appends them to the event log.

Usage (inside a repository's `Save` method):

newVersion, committed, err := aggregate.CommitEvents(ctx, store, encoder, transformers, root)

Returns the new version of the aggregate, a slice of the events that were just committed, and an error if persistence fails (e.g., `version.ConflictError`).

func CommitEventsWithTX

func CommitEventsWithTX[TX any, TID ID, E event.Any, R Root[TID, E]](
	ctx context.Context,
	transactor event.Transactor[TX],
	txLog event.TransactionalLog[TX],
	processor TransactionalAggregateProcessor[TX, TID, E, R],
	encoder encoding.Encoder,
	transformers []event.Transformer[E],
	root R,
) (version.Version, CommittedEvents[E], error)

CommitEventsWithTX is a transactional version of `CommitEvents`. It saves aggregate events and invokes a `TransactionalAggregateProcessor` within the same database transaction, ensuring atomicity. This is ideal for updating read models or handling outbox patterns.

Usage (inside a transactional repository's `Save` method):

newVersion, committed, err := aggregate.CommitEventsWithTX(
    ctx, transactor, txLog, processor, encoder, transformers, root)

Returns the new aggregate version, the committed events, and an error if the transaction fails.

func (CommittedEvents[E]) All

func (committed CommittedEvents[E]) All() iter.Seq[E]

type ESRepo

type ESRepo[TID ID, E event.Any, R Root[TID, E]] struct {
	// contains filtered or unexported fields
}

ESRepo is the standard event-sourced repository implementation. It orchestrates loading and saving aggregates by interacting with an event log and an event registry. By default, it uses JSON for encoding.

func NewESRepo

func NewESRepo[TID ID, E event.Any, R Root[TID, E]](
	eventLog event.Log,
	createRoot func() R,
	transformers []event.Transformer[E],
	opts ...ESRepoOption,
) (*ESRepo[TID, E, R], error)

NewESRepo creates a new event sourced repository. It requires an event log for storage, a factory function to create new aggregate instances, and an optional slice of event transformers. By default, it uses a JSON encoder and automatically registers the aggregate's events.

Usage:

repo, err := NewESRepo(
    eventlog.NewMemory(),
    account.NewEmpty,     // func() *account.Account
    nil,                  // No transformers
)

Returns a fully initialized ESRepo or an error if event registration fails.

func (*ESRepo[TID, E, R]) Get

func (repo *ESRepo[TID, E, R]) Get(ctx context.Context, id TID) (R, error)

Get retrieves the latest state of an aggregate by creating a new instance and replaying all of its events.

func (*ESRepo[TID, E, R]) GetVersion

func (repo *ESRepo[TID, E, R]) GetVersion(
	ctx context.Context,
	id TID,
	selector version.Selector,
) (R, error)

GetVersion retrieves the state of an aggregate at a specific version.

func (*ESRepo[TID, E, R]) LoadAggregate

func (repo *ESRepo[TID, E, R]) LoadAggregate(
	ctx context.Context,
	root R,
	id TID,
	selector version.Selector,
) error

LoadAggregate loads events from the store and applies them to an existing aggregate instance. See `AggregateLoader` for more details.

func (*ESRepo[TID, E, R]) Save

func (repo *ESRepo[TID, E, R]) Save(
	ctx context.Context,
	root R,
) (version.Version, CommittedEvents[E], error)

Save commits all uncommitted events from the aggregate to the event log.

type ESRepoOption

type ESRepoOption func(esRepoConfigurator)

ESRepoOption is a function that configures an ESRepo instance. It is used with `NewESRepo` to customize its behavior.

func AnyEventRegistry

func AnyEventRegistry(anyRegistry event.Registry[event.Any]) ESRepoOption

AnyEventRegistry provides an option to use a shared, global event registry. This allows multiple repositories to share a single registry, preventing duplicate event name registrations across different aggregates.

Usage:

globalRegistry := event.NewRegistry[event.Any]()
repo, err := NewESRepo(..., aggregate.AnyEventRegistry(globalRegistry))

func DontRegisterRoot

func DontRegisterRoot() ESRepoOption

DontRegisterRoot provides an option to prevent the repository from automatically registering the aggregate's events. This is useful when you manage event registration centrally or use a shared registry.

Usage:

repo, err := NewESRepo(..., aggregate.DontRegisterRoot())

func EventCodec added in v0.5.0

func EventCodec(encoder encoding.Codec) ESRepoOption

EventCodec provides an option to override the default JSON encoder with a custom implementation. See the `encoding` package.

Usage:

repo, err := NewESRepo(..., aggregate.EventCodec(myCustomCodec))

type ESRepoWithRetry

type ESRepoWithRetry[TID ID, E event.Any, R Root[TID, E]] struct {
	// contains filtered or unexported fields
}

ESRepoWithRetry is a repository decorator that provides automatic retry logic for the Save method, specifically for handling optimistic concurrency conflicts.

By default, it retries a failing Save operation up to 3 times if the error is a `version.ConflictError`. This behavior can be customized by providing `retry.Option` values.

Usage:

baseRepo, err := chronicle.NewEventSourcedRepository(
    eventLog,
    account.NewEmpty,
    nil,
)
if err != nil { /* ... */ }

// Wrap the base repository to add retry capabilities.
repoWithRetry := chronicle.NewESRepoWithRetry(
    baseRepo,
    retry.Attempts(5),
    retry.Delay(100 * time.Millisecond),
    retry.DelayType(retry.BackOffDelay),
)

// Now, calls to repoWithRetry.Save(ctx, agg) will be retried on conflict.

func NewESRepoWithRetry

func NewESRepoWithRetry[TID ID, E event.Any, R Root[TID, E]](
	repo Repository[TID, E, R],
	opts ...retry.Option,
) *ESRepoWithRetry[TID, E, R]

NewESRepoWithRetry wraps an existing repository with retry logic.

It takes the repository to be wrapped and an optional, variadic list of `retry.Option`s from `github.com/avast/retry-go/v4` to customize the retry strategy (e.g., number of attempts, delay, backoff). If no options are provided, it defaults to 3 attempts on conflict errors.

Returns a new `ESRepoWithRetry` instance.

func (*ESRepoWithRetry[TID, E, R]) Get

func (e *ESRepoWithRetry[TID, E, R]) Get(ctx context.Context, id TID) (R, error)

func (*ESRepoWithRetry[TID, E, R]) GetVersion

func (e *ESRepoWithRetry[TID, E, R]) GetVersion(
	ctx context.Context,
	id TID,
	selector version.Selector,
) (R, error)

func (*ESRepoWithRetry[TID, E, R]) LoadAggregate

func (e *ESRepoWithRetry[TID, E, R]) LoadAggregate(
	ctx context.Context,
	root R,
	id TID,
	selector version.Selector,
) error

func (*ESRepoWithRetry[TID, E, R]) Save

func (e *ESRepoWithRetry[TID, E, R]) Save(
	ctx context.Context,
	root R,
) (version.Version, CommittedEvents[E], error)

Save attempts to persist the aggregate's uncommitted events. If the underlying repository's Save method returns a `version.ConflictError`, this method will automatically retry the operation according to its configured policy.

Returns the new version of the aggregate and the committed events upon a successful save. If all retries fail, it returns the last error encountered.

type ESRepoWithSnapshots

type ESRepoWithSnapshots[TID ID, E event.Any, R Root[TID, E], TS Snapshot[TID]] struct {
	SnapshotPolicy SnapshotPolicy[TID, E, R]
	// contains filtered or unexported fields
}

ESRepoWithSnapshots is a decorator for a Repository that adds a snapshotting capability to improve performance for aggregates with long event histories.

When retrieving an aggregate, it first attempts to load from a recent snapshot and then replays only the events that occurred since. When saving, it commits events and then, based on a configured policy, creates and stores a new snapshot of the aggregate's state.

func NewESRepoWithSnapshots

func NewESRepoWithSnapshots[TID ID, E event.Any, R Root[TID, E], TS Snapshot[TID]](
	esRepo Repository[TID, E, R],
	snapstore SnapshotStore[TID, TS],
	snapshotter Snapshotter[TID, E, R, TS],
	snapPolicy SnapshotPolicy[TID, E, R],
	opts ...ESRepoWithSnapshotsOption,
) (*ESRepoWithSnapshots[TID, E, R, TS], error)

NewESRepoWithSnapshots creates a new repository decorator that adds snapshotting functionality.

Usage:

// baseRepo is a standard event-sourced repository.
// snapStore is an implementation of aggregate.SnapshotStore.
// snapshotter is an implementation of aggregate.Snapshotter.
repoWithSnaps, err := aggregate.NewESRepoWithSnapshots(
	baseRepo,
	snapStore,
	snapshotter,
	aggregate.SnapPolicyFor[...].EveryNEvents(50), // Snapshot every 50 events.
)

Returns a new repository equipped with snapshotting capabilities, or an error if the configuration is invalid.

func (*ESRepoWithSnapshots[TID, E, R, TS]) Get

func (esr *ESRepoWithSnapshots[TID, E, R, TS]) Get(ctx context.Context, id TID) (R, error)

Get retrieves an aggregate's state. It first attempts to load the aggregate from the most recent snapshot. If a snapshot is found, it then replays only the events that occurred after that snapshot's version. If no snapshot is found, it falls back to replaying the aggregate's entire event history.

Usage:

// The repository will automatically use a snapshot if available.
account, err := repo.Get(ctx, accountID)

Returns the fully constituted aggregate root and an error if loading fails.

func (*ESRepoWithSnapshots[TID, E, R, TS]) GetVersion

func (esr *ESRepoWithSnapshots[TID, E, R, TS]) GetVersion(
	ctx context.Context,
	id TID,
	selector version.Selector,
) (R, error)

func (*ESRepoWithSnapshots[TID, E, R, TS]) LoadAggregate

func (esr *ESRepoWithSnapshots[TID, E, R, TS]) LoadAggregate(
	ctx context.Context,
	root R,
	id TID,
	selector version.Selector,
) error

func (*ESRepoWithSnapshots[TID, E, R, TS]) Save

func (esr *ESRepoWithSnapshots[TID, E, R, TS]) Save(
	ctx context.Context,
	root R,
) (version.Version, CommittedEvents[E], error)

Save persists the uncommitted events of an aggregate. After successfully saving the events, it runs the configured SnapshotPolicy to determine whether a new snapshot of the aggregate's state should also be saved.

Usage:

account.DepositMoney(100)
// Events are saved, and a snapshot may be created and saved.
_, _, err := repo.Save(ctx, account)

Returns the new version of the aggregate, the list of committed events, and an error if either event persistence or (if configured to) snapshot persistence fails.

type ESRepoWithSnapshotsOption

type ESRepoWithSnapshotsOption func(esRepoWithSnapshotsConfigurator)

func OnSnapshotError

func OnSnapshotError(fn OnSnapshotErrFunc) ESRepoWithSnapshotsOption

OnSnapshotError provides an option to set a custom handler for snapshot save errors.

Usage:

// Configure the repository to log and ignore snapshot save errors
repo, err := aggregate.NewESRepoWithSnapshots(
    ...,
    aggregate.OnSnapshotError(func(ctx context.Context, err error) error {
        log.Printf("Failed to save snapshot: %v", err)
        return nil // nil means the error is handled and won't be returned by Save
    }),
)

func SnapshotSaveEnabled

func SnapshotSaveEnabled(enabled bool) ESRepoWithSnapshotsOption

SnapshotSaveEnabled provides an option to enable or disable the saving of snapshots. This can be useful for diagnostics or for scenarios where snapshotting is controlled by a separate process. The default is true.

Usage:

// Disable snapshot saving for this repository instance.
repo, err := aggregate.NewESRepoWithSnapshots(
    ...,
    aggregate.SnapshotSaveEnabled(false),
)

type FusedRepo

type FusedRepo[TID ID, E event.Any, R Root[TID, E]] struct {
	AggregateLoader[TID, E, R]
	VersionedGetter[TID, E, R]
	Getter[TID, E, R]
	Saver[TID, E, R]
}

FusedRepo is a convenience type that implements the Repository interface by composing its constituent parts. It is useful for creating repository decorators, such as a retry wrapper, where you might only want to override one behavior (like Save) while keeping the others.

Usage:

// Wrap a standard repository's Save method with a custom retry mechanism
baseRepo := ...
repoWithRetry := &FusedRepo[...]{
    AggregateLoader: baseRepo,
    VersionedGetter: baseRepo,
    Getter:          baseRepo,
    Saver:           &MyCustomSaverWithRetry{saver: baseRepo},
}

type Getter

type Getter[TID ID, E event.Any, R Root[TID, E]] interface {
	Get(ctx context.Context, id TID) (R, error)
}

Getter is responsible for retrieving the latest state of an aggregate root. It loads all events for the given ID and replays them to reconstruct the aggregate.

Usage:

// repo implements Getter
account, err := repo.Get(ctx, "account-123")

Returns the fully hydrated aggregate root or an error if the aggregate is not found or loading fails.

type ID

type ID interface {
	fmt.Stringer
}

ID is a constraint for an aggregate's identifier. Any type used as an ID must implement the `fmt.Stringer` interface.

type IDer

type IDer[TID ID] interface {
	ID() TID
}

type OnSnapshotErrFunc

type OnSnapshotErrFunc = func(ctx context.Context, err error) error

OnSnapshotErrFunc defines the signature for a function that handles errors occurring during the saving of a snapshot.

Since snapshots are a performance optimization and not the source of truth, it is often acceptable to log the error and continue without returning it, preventing a snapshot failure from failing the entire operation.

Usage:

// Configure the repository to log snapshot errors but not fail the Save operation.
option := aggregate.OnSnapshotError(func(ctx context.Context, err error) error {
    log.Printf("Warning: failed to save snapshot: %v", err)
    return nil // Returning nil ignores the error.
})

type ProcessorChain

type ProcessorChain[TX any, TID ID, E event.Any, R Root[TID, E]] struct {
	// contains filtered or unexported fields
}

ProcessorChain chains multiple TransactionalAggregateProcessor instances together, executing them sequentially as a single unit. It implements the TransactionalAggregateProcessor interface itself, allowing it to be used anywhere a single processor is expected.

This is useful for composing multiple transactional side-effects for a single aggregate save operation, such as updating a read model *and* creating an outbox entry in the same transaction.

If any processor in the chain returns an error, execution stops immediately, and the error is returned. This ensures that the entire chain's operations, and thus the parent transaction, are atomic.

Example:

// Create individual processors
outboxProcessor := &OutboxProcessor{}
projectionProcessor := &ProjectionProcessor{}

// Combine them into a single chain
processorChain := NewProcessorChain(
    outboxProcessor,
    projectionProcessor,
)

// Use the chain with a transactional repository
repo, err := NewTransactionalRepository(
    db,
    account.NewEmpty,
    nil,
    processorChain, // The chain acts as a single processor
)

func NewProcessorChain

func NewProcessorChain[TX any, TID ID, E event.Any, R Root[TID, E]](
	processors ...TransactionalAggregateProcessor[TX, TID, E, R],
) ProcessorChain[TX, TID, E, R]

NewProcessorChain creates a new ProcessorChain from the provided processors. The processors will be executed by the chain's Process method in the same order they are provided here.

func (ProcessorChain[TX, TID, E, R]) Process

func (pc ProcessorChain[TX, TID, E, R]) Process(
	ctx context.Context,
	tx TX,
	root R,
	events CommittedEvents[E],
) error

Process executes each processor in the chain sequentially. It satisfies the TransactionalAggregateProcessor interface.

Each processor is called with the same context, transaction handle, aggregate root, and committed events.

If any processor returns an error, execution of the chain is halted, and the error is returned immediately. This will cause the parent transaction to be rolled back. If all processors complete successfully, it returns nil.

type Repository

type Repository[TID ID, E event.Any, R Root[TID, E]] interface {
	AggregateLoader[TID, E, R]
	VersionedGetter[TID, E, R]
	Getter[TID, E, R]
	Saver[TID, E, R]
}

Repository combines the core operations for loading and saving an aggregate root. It is the primary interface for interacting with aggregates.

type Root

type Root[TID ID, E event.Any] interface {
	Aggregate[TID, E]
	Versioner
	event.EventFuncCreator[E]
	// contains filtered or unexported methods
}

Root represents the complete contract for an aggregate root in this framework. It combines the `Aggregate`, `Versioner`, and `event.EventFuncCreator` interfaces, along with internal methods for event handling. The easiest way to satisfy this interface is to embed `aggregate.Base` in your struct.

type Saver

type Saver[TID ID, E event.Any, R Root[TID, E]] interface {
	Save(ctx context.Context, root R) (version.Version, CommittedEvents[E], error)
}

Saver is responsible for persisting the uncommitted events of an aggregate root.

Usage:

// repo implements Saver
newVersion, committedEvents, err := repo.Save(ctx, myAccount)

Returns the new version of the aggregate, the events that were committed, and an error if saving fails (e.g., a `version.ConflictError`).

type Snapshot

type Snapshot[TID ID] interface {
	IDer[TID]
	Versioner
}

Snapshot represents the contract for an encoded, point-in-time representation of an aggregate's state. Implementations of this interface hold the necessary data to reconstruct an aggregate, along with its ID and the version at which the snapshot was taken.

Usage:

type AccountSnapshot struct {
    AccountID        account.AccountID
    AggregateVersion version.Version
    Balance          int
}

func (s *AccountSnapshot) ID() account.AccountID { return s.AccountID }
func (s *AccountSnapshot) Version() version.Version { return s.AggregateVersion }

type SnapshotPolicy added in v0.5.0

type SnapshotPolicy[TID ID, E event.Any, R Root[TID, E]] interface {
	ShouldSnapshot(

		ctx context.Context,

		root R,

		previousVersion version.Version,

		newVersion version.Version,

		committedEvents CommittedEvents[E],
	) bool
}

SnapshotPolicy defines the policy for when to create a snapshot of an aggregate. Implementations of this interface allow for flexible and domain-specific rules to decide if a snapshot is wanted after a successful save operation.

Usage:

// This interface is used when configuring a repository with snapshotting.
type myCustomPolicy struct {}
func (s *myCustomPolicy) ShouldSnapshot(...) bool {
	// custom logic
	return true
}

repoWithSnaps, _ := chronicle.NewEventSourcedRepositoryWithSnapshots(
    baseRepo,
    snapStore,
    snapshotter,
    &myCustomPolicy{},
)

type SnapshotStore

type SnapshotStore[TID ID, TS Snapshot[TID]] interface {
	SaveSnapshot(ctx context.Context, snapshot TS) error

	// GetSnapshot retrieves the latest snapshot for a given aggregate ID.
	//
	// Returns the snapshot, a boolean indicating if it was found, and an error if one occurred.
	GetSnapshot(ctx context.Context, aggregateID TID) (TS, bool, error)
}

SnapshotStore defines the contract for a storage mechanism for snapshots. Implementations are responsible for persisting and retrieving snapshot data, for example, in memory, in a database table, or in a key-value store.

Usage:

// snapStore is an implementation of SnapshotStore, e.g., snapshotstore.NewMemory()
err := snapStore.SaveSnapshot(ctx, mySnapshot)
if err != nil {
    // handle error
}

snapshot, found, err := snapStore.GetSnapshot(ctx, aggregateID)
if err != nil {
    // handle error
}
if found {
    // use the snapshot
}

type Snapshotter

type Snapshotter[TID ID, E event.Any, R Root[TID, E], TS Snapshot[TID]] interface {
	ToSnapshot(R) (TS, error)
	FromSnapshot(TS) (R, error)
}

Snapshotter defines the mechanism for converting a live aggregate root into its snapshot representation and back. It acts as a bridge between the domain object and its persisted state.

Usage:

type AccountSnapshotter struct{}

func (s *AccountSnapshotter) ToSnapshot(acc *account.Account) (*account.Snapshot, error) {
    return &account.Snapshot{
        AccountID:        acc.ID(),
        AggregateVersion: acc.Version(),
        Balance:          acc.Balance(),
    }, nil
}

func (s *AccountSnapshotter) FromSnapshot(snap *account.Snapshot) (*account.Account, error) {
    // Recreate the aggregate from snapshot data
    acc := account.NewEmpty()
    // ... set fields from snap ...
    return acc, nil
}

type TransactionalAggregateProcessor

type TransactionalAggregateProcessor[TX any, TID ID, E event.Any, R Root[TID, E]] interface {
	// Process is called by the TransactionalRepository *inside* an active transaction,
	// immediately after the aggregate's events have been successfully saved to the event log.
	// It receives the transaction handle, the aggregate in its new state, and the
	// strongly-typed events that were just committed.
	//
	// Returns an error if processing fails. This will cause the entire transaction to be
	// rolled back, including the saving of the events. Returns nil on success.
	Process(ctx context.Context, tx TX, root R, events CommittedEvents[E]) error
}

TransactionalAggregateProcessor defines a contract for processing an aggregate and its committed events within the same transaction as the save operation. This is a high-level, type-safe hook that is useful for atomically updating read models (projections) or creating outbox messages.

The 'Process' method is called by a TransactionalRepository *inside* an active transaction, immediately after the aggregate's events have been successfully saved to the event log. This guarantees that the event log write and any side effects performed by the processor (like updating a projection table or inserting an outbox message) either all succeed or all fail together.

TX is the transaction handle type (e.g., *sql.Tx). TID is the aggregate's ID type. E is the aggregate's base event type. R is the aggregate root type.

Usage (Outbox Pattern with *sql.Tx):

// Assume an outbox table:
// CREATE TABLE outbox_messages (id SERIAL PRIMARY KEY, event_name TEXT, payload JSONB);

import (
	"database/sql"
	"github.com/DeluxeOwl/chronicle/examples/internal/account"
)

type OutboxProcessor struct {
	// ... dependencies like a logger
}

func (p *OutboxProcessor) Process(
	ctx context.Context,
	tx *sql.Tx, // The active database transaction
	root *account.Account,
	events CommittedEvents[account.AccountEvent],
) error {
	stmt, err := tx.PrepareContext(ctx, "INSERT INTO outbox_messages (event_name, payload) VALUES ($1, $2)")
	if err != nil {
		return fmt.Errorf("prepare outbox insert: %w", err)
	}
	defer stmt.Close()

	for evt := range events.All() {
		payload, err := json.Marshal(evt)
		if err != nil {
			return fmt.Errorf("marshal event %s for outbox: %w", evt.EventName(), err)
		}

		if _, err := stmt.ExecContext(ctx, evt.EventName(), payload); err != nil {
			return fmt.Errorf("insert event %s into outbox: %w", evt.EventName(), err)
		}
	}
	return nil
}

// Then, wire it into a transactional repository:
// outboxProcessor := &OutboxProcessor{}
// repo, err := NewTransactionalRepository(
//     postgresEventLog, // a transactional event log
//     account.NewEmpty,
//     nil, // transformers
//     outboxProcessor,
// )

type TransactionalRepository

type TransactionalRepository[T any, TID ID, E event.Any, R Root[TID, E]] struct {
	// contains filtered or unexported fields
}

TransactionalRepository is a repository implementation that ensures that saving an aggregate's events and processing them (e.g., for updating projections) occur within a single, atomic transaction. It orchestrates operations using an `event.Transactor` and an `event.TransactionalLog`, and can execute a type-safe `TransactionalAggregateProcessor` as part of the same transaction. This is the useful for implementing strongly consistent read models or the transactional outbox pattern.

func NewTransactionalRepository

func NewTransactionalRepository[TX any, TID ID, E event.Any, R Root[TID, E]](
	log event.TransactionalEventLog[TX],
	createRoot func() R,
	transformers []event.Transformer[E],
	aggProcessor TransactionalAggregateProcessor[TX, TID, E, R],
	opts ...ESRepoOption,
) (*TransactionalRepository[TX, TID, E, R], error)

NewTransactionalRepository creates a repository that manages operations within an atomic transaction. This constructor is a convenience for when the event log implementation (like the provided Postgres or Sqlite logs) also serves as the transaction manager by implementing `event.TransactionalEventLog`.

Usage:

// postgresLog implements event.TransactionalEventLog[*sql.Tx]
// myProcessor implements aggregate.TransactionalAggregateProcessor for *sql.Tx
repo, err := aggregate.NewTransactionalRepository(
    postgresLog,
    account.NewEmpty,
    nil, // no transformers
    myProcessor,
)

Returns a new `*TransactionalRepository` configured for atomic operations, or an error if setup fails.

func NewTransactionalRepositoryWithTransactor

func NewTransactionalRepositoryWithTransactor[TX any, TID ID, E event.Any, R Root[TID, E]](
	transactor event.Transactor[TX],
	txLog event.TransactionalLog[TX],
	createRoot func() R,
	transformers []event.Transformer[E],
	aggProcessor TransactionalAggregateProcessor[TX, TID, E, R],
	opts ...ESRepoOption,
) (*TransactionalRepository[TX, TID, E, R], error)

NewTransactionalRepositoryWithTransactor creates a transactional repository with a separate transactor and log. This constructor provides more flexibility by decoupling the transaction management from the event storage logic. It is useful in advanced scenarios where you might use a generic transaction coordinator.

Usage:

// myTransactor implements event.Transactor[*sql.Tx]
// myTxLog implements event.TransactionalLog[*sql.Tx]
repo, err := aggregate.NewTransactionalRepositoryWithTransactor(
    myTransactor,
    myTxLog,
    account.NewEmpty,
    nil, // no transformers
    myProcessor,
)

Returns a new `*TransactionalRepository`, or an error if setup fails.

func (*TransactionalRepository[TX, TID, E, R]) Get

func (repo *TransactionalRepository[TX, TID, E, R]) Get(ctx context.Context, id TID) (R, error)

func (*TransactionalRepository[TX, TID, E, R]) GetVersion

func (repo *TransactionalRepository[TX, TID, E, R]) GetVersion(
	ctx context.Context,
	id TID,
	selector version.Selector,
) (R, error)

func (*TransactionalRepository[TX, TID, E, R]) LoadAggregate

func (repo *TransactionalRepository[TX, TID, E, R]) LoadAggregate(
	ctx context.Context,
	root R,
	id TID,
	selector version.Selector,
) error

func (*TransactionalRepository[TX, TID, E, R]) Save

func (repo *TransactionalRepository[TX, TID, E, R]) Save(
	ctx context.Context,
	root R,
) (version.Version, CommittedEvents[E], error)

Save atomically persists the aggregate's uncommitted events and executes the configured transactional processor within a single database transaction. If any part of the process fails, the entire transaction is rolled back, ensuring data consistency.

Usage:

// acc is an aggregate with uncommitted events
newVersion, committedEvents, err := transactionalRepo.Save(ctx, acc)
if err != nil {
    // The transaction was rolled back.
    // Handle the conflict or transient error.
}

Returns the aggregate's new version and the list of committed events on success. Returns an error if saving the events or executing the processor fails.

type UncommittedEvents

type UncommittedEvents[E event.Any] []E

UncommittedEvents represents a strongly-typed slice of events that have been recorded on an aggregate but not yet persisted to the event log.

func FlushUncommittedEvents

func FlushUncommittedEvents[TID ID, E event.Any, R Root[TID, E]](
	root R,
) UncommittedEvents[E]

FlushUncommittedEvents clears the pending events from an aggregate and returns them as a strongly-typed slice. This is the first step in the `Save` process.

Usage (called by `CommitEvents`):

uncommitted := aggregate.FlushUncommittedEvents(root)

Returns a slice of the uncommitted events.

type VersionedGetter

type VersionedGetter[TID ID, E event.Any, R Root[TID, E]] interface {
	GetVersion(ctx context.Context, id TID, selector version.Selector) (R, error)
}

VersionedGetter is responsible for retrieving an aggregate root at a specific version. It loads and replays events up to the version specified in the selector.

Usage:

// repo implements VersionedGetter
// Get the state of the account after its 5th event
account, err := repo.GetVersion(ctx, "account-123", version.Selector{From: 1, To: 5})

Returns the hydrated aggregate root at the specified version or an error.

type Versioner

type Versioner interface {
	Version() version.Version
}

Versioner is an interface for any type that can be versioned. This is typically implemented by embedding the `aggregate.Base` struct.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL