Documentation
¶
Index ¶
- Variables
- func ApplyReadTransformers[E event.Any](ctx context.Context, events []E, transformers []event.Transformer[E]) ([]E, error)
- func ApplyWriteTransformers[E event.Any](ctx context.Context, events []E, transformers []event.Transformer[E]) ([]E, error)
- func LoadFromRecords[TID ID, E event.Any](ctx context.Context, root Root[TID, E], registry event.Registry[E], ...) error
- func LoadFromSnapshot[TID ID, E event.Any, R Root[TID, E], TS Snapshot[TID]](ctx context.Context, store SnapshotStore[TID, TS], ...) (R, bool, error)
- func RawEventsFromUncommitted[E event.Any](ctx context.Context, encoder encoding.Encoder, ...) ([]event.Raw, error)
- func ReadAndLoadFromStore[TID ID, E event.Any](ctx context.Context, root Root[TID, E], store event.Log, ...) error
- func RecordEvent[TID ID, E event.Any](root Root[TID, E], e E) error
- func RecordEvents[TID ID, E event.Any](root Root[TID, E], events ...E) error
- func SnapPolicyFor[R Root[TID, E], TID ID, E event.Any]() *policyBuilder[TID, E, R]
- type Aggregate
- type AggregateLoader
- type Base
- type CommittedEvents
- func CommitEvents[TID ID, E event.Any, R Root[TID, E]](ctx context.Context, store event.Log, encoder encoding.Encoder, ...) (version.Version, CommittedEvents[E], error)
- func CommitEventsWithTX[TX any, TID ID, E event.Any, R Root[TID, E]](ctx context.Context, transactor event.Transactor[TX], ...) (version.Version, CommittedEvents[E], error)
- type ESRepo
- func (repo *ESRepo[TID, E, R]) Get(ctx context.Context, id TID) (R, error)
- func (repo *ESRepo[TID, E, R]) GetVersion(ctx context.Context, id TID, selector version.Selector) (R, error)
- func (repo *ESRepo[TID, E, R]) LoadAggregate(ctx context.Context, root R, id TID, selector version.Selector) error
- func (repo *ESRepo[TID, E, R]) Save(ctx context.Context, root R) (version.Version, CommittedEvents[E], error)
- type ESRepoOption
- type ESRepoWithRetry
- func (e *ESRepoWithRetry[TID, E, R]) Get(ctx context.Context, id TID) (R, error)
- func (e *ESRepoWithRetry[TID, E, R]) GetVersion(ctx context.Context, id TID, selector version.Selector) (R, error)
- func (e *ESRepoWithRetry[TID, E, R]) LoadAggregate(ctx context.Context, root R, id TID, selector version.Selector) error
- func (e *ESRepoWithRetry[TID, E, R]) Save(ctx context.Context, root R) (version.Version, CommittedEvents[E], error)
- type ESRepoWithSnapshots
- func (esr *ESRepoWithSnapshots[TID, E, R, TS]) Get(ctx context.Context, id TID) (R, error)
- func (esr *ESRepoWithSnapshots[TID, E, R, TS]) GetVersion(ctx context.Context, id TID, selector version.Selector) (R, error)
- func (esr *ESRepoWithSnapshots[TID, E, R, TS]) LoadAggregate(ctx context.Context, root R, id TID, selector version.Selector) error
- func (esr *ESRepoWithSnapshots[TID, E, R, TS]) Save(ctx context.Context, root R) (version.Version, CommittedEvents[E], error)
- type ESRepoWithSnapshotsOption
- type FusedRepo
- type Getter
- type ID
- type IDer
- type OnSnapshotErrFunc
- type ProcessorChain
- type Repository
- type Root
- type Saver
- type Snapshot
- type SnapshotPolicy
- type SnapshotStore
- type Snapshotter
- type TransactionalAggregateProcessor
- type TransactionalRepository
- func NewTransactionalRepository[TX any, TID ID, E event.Any, R Root[TID, E]](log event.TransactionalEventLog[TX], createRoot func() R, ...) (*TransactionalRepository[TX, TID, E, R], error)
- func NewTransactionalRepositoryWithTransactor[TX any, TID ID, E event.Any, R Root[TID, E]](transactor event.Transactor[TX], txLog event.TransactionalLog[TX], ...) (*TransactionalRepository[TX, TID, E, R], error)
- func (repo *TransactionalRepository[TX, TID, E, R]) Get(ctx context.Context, id TID) (R, error)
- func (repo *TransactionalRepository[TX, TID, E, R]) GetVersion(ctx context.Context, id TID, selector version.Selector) (R, error)
- func (repo *TransactionalRepository[TX, TID, E, R]) LoadAggregate(ctx context.Context, root R, id TID, selector version.Selector) error
- func (repo *TransactionalRepository[TX, TID, E, R]) Save(ctx context.Context, root R) (version.Version, CommittedEvents[E], error)
- type UncommittedEvents
- type VersionedGetter
- type Versioner
Constants ¶
This section is empty.
Variables ¶
var ErrRootNotFound = errors.New("root not found")
Functions ¶
func ApplyReadTransformers ¶ added in v0.5.0
func ApplyWriteTransformers ¶ added in v0.5.0
func LoadFromRecords ¶
func LoadFromRecords[TID ID, E event.Any]( ctx context.Context, root Root[TID, E], registry event.Registry[E], decoder encoding.Decoder, transformers []event.Transformer[E], records event.Records, ) error
LoadFromRecords hydrates an aggregate root from an iterator of event records. For each record, it decodes the data into a concrete event type, applies any transformations, and then applies the event to the root.
Usage (used by `ReadAndLoadFromStore` and snapshot loading logic):
err := aggregate.LoadFromRecords(ctx, root, registry, ser, transformers, records)
Returns an error if decoding, transformation, or application of an event fails.
func LoadFromSnapshot ¶
func LoadFromSnapshot[TID ID, E event.Any, R Root[TID, E], TS Snapshot[TID]]( ctx context.Context, store SnapshotStore[TID, TS], snapshotter Snapshotter[TID, E, R, TS], aggregateID TID, ) (R, bool, error)
LoadFromSnapshot orchestrates the retrieval and rehydration of an aggregate from a snapshot. It fetches the latest snapshot from the store and uses the snapshotter to convert it back into a live aggregate root instance, with its version correctly set.
Usage:
// Typically used within a repository's Get method.
root, found, err := aggregate.LoadFromSnapshot(
ctx,
snapshotStore,
snapshotter,
aggregateID,
)
if err != nil {
return nil, err
}
if found {
// The aggregate is partially loaded from the snapshot.
// Now, load subsequent events from the event log.
} else {
// No snapshot found, load from the beginning of the event log.
}
Returns the rehydrated aggregate root, a boolean indicating if a snapshot was found, and any error that occurred during the process.
func RawEventsFromUncommitted ¶
func RawEventsFromUncommitted[E event.Any]( ctx context.Context, encoder encoding.Encoder, transformers []event.Transformer[E], uncommitted UncommittedEvents[E], ) ([]event.Raw, error)
RawEventsFromUncommitted converts a slice of strongly-typed uncommitted events into a slice of `event.Raw` events. It applies write-side transformers and encodes the event data during this process.
Usage (called by `CommitEvents`):
rawEvents, err := aggregate.RawEventsFromUncommitted(ctx, encoder, transformers, uncommitted)
Returns a slice of `event.Raw` ready for storage, or an error if transformation or encoding fails.
func ReadAndLoadFromStore ¶
func ReadAndLoadFromStore[TID ID, E event.Any]( ctx context.Context, root Root[TID, E], store event.Log, registry event.Registry[E], decoder encoding.Decoder, transformers []event.Transformer[E], id TID, selector version.Selector, ) error
ReadAndLoadFromStore is a framework helper that orchestrates loading an aggregate. It reads event records from the event store and uses them to hydrate a new instance of an aggregate root.
Usage (typically inside a repository's Get method):
err := aggregate.ReadAndLoadFromStore(ctx, root, store, registry, ser, transformers, id, selector)
Returns an error if reading from the store, decoding, or applying any event fails. Returns an error if the aggregate is not found (i.e., has no events).
func RecordEvent ¶
RecordEvent is the primary function for recording a new event against an aggregate. It first applies the event to the aggregate to update its state, and upon success, adds the event to an internal list of uncommitted events to be persisted later.
A recommended pattern is to create a private, type-safe wrapper for this function on your aggregate. This improves type safety and autocompletion within your command methods.
Usage:
// 1. Define a private helper method on your aggregate that calls RecordEvent.
// Notice the parameter is `AccountEvent`, your specific event sum type.
func (a *Account) recordThat(event AccountEvent) error {
return aggregate.RecordEvent(a, event)
}
// 2. Use this helper in your command methods. The Go compiler will now ensure
// you only pass valid AccountEvent types.
func (a *Account) DepositMoney(amount int) error {
if amount <= 0 {
return errors.New("amount must be positive")
}
return a.recordThat(&moneyDeposited{Amount: amount})
}
Returns an error if the aggregate's `Apply` method returns an error.
func RecordEvents ¶
RecordEvents is a convenience function to record multiple events in a single call. Each event is applied and recorded sequentially.
Usage (inside a command method on your aggregate):
return aggregate.RecordEvents(a, &eventOne{}, &eventTwo{})
Returns an error if any of the `Apply` calls fail.
func SnapPolicyFor ¶ added in v0.5.0
SnapPolicyFor provides a fluent builder for creating common snapshot policies.
Usage:
// Create a policy that snapshots every 10 events. policy := aggregate.SnapPolicyFor[*account.Account]().EveryNEvents(10) // Create a composite policy. policy2 := aggregate.SnapPolicyFor[*account.Account]().AnyOf(...)
Types ¶
type Aggregate ¶
Aggregate defines the core behavior of an event-sourced aggregate. It must be able to change its state by applying an event and must provide its unique ID.
type AggregateLoader ¶
type AggregateLoader[TID ID, E event.Any, R Root[TID, E]] interface { LoadAggregate( ctx context.Context, root R, id TID, selector version.Selector, ) error }
AggregateLoader is an interface for loading events and applying them to an *existing* instance of an aggregate root. This is primarily used by the snapshotting mechanism, which first creates an aggregate from a snapshot, then uses this interface to load and apply only the new events that occurred after the snapshot was taken.
Usage (internal, used by repository with snapshots):
// repo implements AggregateLoader
// root is an aggregate created from a snapshot at version 10
err := repo.LoadAggregate(ctx, root, "id-123", version.Selector{From: 11})
Returns an error if loading or applying subsequent events fails.
type Base ¶
type Base struct {
// contains filtered or unexported fields
}
Base is a struct that should be embedded in your aggregate root. It provides the fundamental mechanisms for versioning and tracking uncommitted events. By embedding Base, your aggregate automatically satisfies several required interfaces for the event sourcing framework.
Usage:
type MyAggregate struct {
aggregate.Base
// ... other fields
}
type CommittedEvents ¶
CommittedEvents represents a strongly-typed slice of events that have been successfully persisted to the event log.
func CommitEvents ¶
func CommitEvents[TID ID, E event.Any, R Root[TID, E]]( ctx context.Context, store event.Log, encoder encoding.Encoder, transformers []event.Transformer[E], root R, ) (version.Version, CommittedEvents[E], error)
CommitEvents orchestrates the process of persisting an aggregate's pending changes. It is a reusable helper for implementing a repository's `Save` method. It flushes events, calculates the expected version, prepares them for storage, and appends them to the event log.
Usage (inside a repository's `Save` method):
newVersion, committed, err := aggregate.CommitEvents(ctx, store, encoder, transformers, root)
Returns the new version of the aggregate, a slice of the events that were just committed, and an error if persistence fails (e.g., `version.ConflictError`).
func CommitEventsWithTX ¶
func CommitEventsWithTX[TX any, TID ID, E event.Any, R Root[TID, E]]( ctx context.Context, transactor event.Transactor[TX], txLog event.TransactionalLog[TX], processor TransactionalAggregateProcessor[TX, TID, E, R], encoder encoding.Encoder, transformers []event.Transformer[E], root R, ) (version.Version, CommittedEvents[E], error)
CommitEventsWithTX is a transactional version of `CommitEvents`. It saves aggregate events and invokes a `TransactionalAggregateProcessor` within the same database transaction, ensuring atomicity. This is ideal for updating read models or handling outbox patterns.
Usage (inside a transactional repository's `Save` method):
newVersion, committed, err := aggregate.CommitEventsWithTX(
ctx, transactor, txLog, processor, encoder, transformers, root)
Returns the new aggregate version, the committed events, and an error if the transaction fails.
func (CommittedEvents[E]) All ¶
func (committed CommittedEvents[E]) All() iter.Seq[E]
type ESRepo ¶
type ESRepo[TID ID, E event.Any, R Root[TID, E]] struct { // contains filtered or unexported fields }
ESRepo is the standard event-sourced repository implementation. It orchestrates loading and saving aggregates by interacting with an event log and an event registry. By default, it uses JSON for encoding.
func NewESRepo ¶
func NewESRepo[TID ID, E event.Any, R Root[TID, E]]( eventLog event.Log, createRoot func() R, transformers []event.Transformer[E], opts ...ESRepoOption, ) (*ESRepo[TID, E, R], error)
NewESRepo creates a new event sourced repository. It requires an event log for storage, a factory function to create new aggregate instances, and an optional slice of event transformers. By default, it uses a JSON encoder and automatically registers the aggregate's events.
Usage:
repo, err := NewESRepo(
eventlog.NewMemory(),
account.NewEmpty, // func() *account.Account
nil, // No transformers
)
Returns a fully initialized ESRepo or an error if event registration fails.
func (*ESRepo[TID, E, R]) Get ¶
Get retrieves the latest state of an aggregate by creating a new instance and replaying all of its events.
func (*ESRepo[TID, E, R]) GetVersion ¶
func (repo *ESRepo[TID, E, R]) GetVersion( ctx context.Context, id TID, selector version.Selector, ) (R, error)
GetVersion retrieves the state of an aggregate at a specific version.
type ESRepoOption ¶
type ESRepoOption func(esRepoConfigurator)
ESRepoOption is a function that configures an ESRepo instance. It is used with `NewESRepo` to customize its behavior.
func AnyEventRegistry ¶
func AnyEventRegistry(anyRegistry event.Registry[event.Any]) ESRepoOption
AnyEventRegistry provides an option to use a shared, global event registry. This allows multiple repositories to share a single registry, preventing duplicate event name registrations across different aggregates.
Usage:
globalRegistry := event.NewRegistry[event.Any]() repo, err := NewESRepo(..., aggregate.AnyEventRegistry(globalRegistry))
func DontRegisterRoot ¶
func DontRegisterRoot() ESRepoOption
DontRegisterRoot provides an option to prevent the repository from automatically registering the aggregate's events. This is useful when you manage event registration centrally or use a shared registry.
Usage:
repo, err := NewESRepo(..., aggregate.DontRegisterRoot())
func EventCodec ¶ added in v0.5.0
func EventCodec(encoder encoding.Codec) ESRepoOption
EventCodec provides an option to override the default JSON encoder with a custom implementation. See the `encoding` package.
Usage:
repo, err := NewESRepo(..., aggregate.EventCodec(myCustomCodec))
type ESRepoWithRetry ¶
type ESRepoWithRetry[TID ID, E event.Any, R Root[TID, E]] struct { // contains filtered or unexported fields }
ESRepoWithRetry is a repository decorator that provides automatic retry logic for the Save method, specifically for handling optimistic concurrency conflicts.
By default, it retries a failing Save operation up to 3 times if the error is a `version.ConflictError`. This behavior can be customized by providing `retry.Option` values.
Usage:
baseRepo, err := chronicle.NewEventSourcedRepository(
eventLog,
account.NewEmpty,
nil,
)
if err != nil { /* ... */ }
// Wrap the base repository to add retry capabilities.
repoWithRetry := chronicle.NewESRepoWithRetry(
baseRepo,
retry.Attempts(5),
retry.Delay(100 * time.Millisecond),
retry.DelayType(retry.BackOffDelay),
)
// Now, calls to repoWithRetry.Save(ctx, agg) will be retried on conflict.
func NewESRepoWithRetry ¶
func NewESRepoWithRetry[TID ID, E event.Any, R Root[TID, E]]( repo Repository[TID, E, R], opts ...retry.Option, ) *ESRepoWithRetry[TID, E, R]
NewESRepoWithRetry wraps an existing repository with retry logic.
It takes the repository to be wrapped and an optional, variadic list of `retry.Option`s from `github.com/avast/retry-go/v4` to customize the retry strategy (e.g., number of attempts, delay, backoff). If no options are provided, it defaults to 3 attempts on conflict errors.
Returns a new `ESRepoWithRetry` instance.
func (*ESRepoWithRetry[TID, E, R]) Get ¶
func (e *ESRepoWithRetry[TID, E, R]) Get(ctx context.Context, id TID) (R, error)
func (*ESRepoWithRetry[TID, E, R]) GetVersion ¶
func (*ESRepoWithRetry[TID, E, R]) LoadAggregate ¶
func (*ESRepoWithRetry[TID, E, R]) Save ¶
func (e *ESRepoWithRetry[TID, E, R]) Save( ctx context.Context, root R, ) (version.Version, CommittedEvents[E], error)
Save attempts to persist the aggregate's uncommitted events. If the underlying repository's Save method returns a `version.ConflictError`, this method will automatically retry the operation according to its configured policy.
Returns the new version of the aggregate and the committed events upon a successful save. If all retries fail, it returns the last error encountered.
type ESRepoWithSnapshots ¶
type ESRepoWithSnapshots[TID ID, E event.Any, R Root[TID, E], TS Snapshot[TID]] struct { SnapshotPolicy SnapshotPolicy[TID, E, R] // contains filtered or unexported fields }
ESRepoWithSnapshots is a decorator for a Repository that adds a snapshotting capability to improve performance for aggregates with long event histories.
When retrieving an aggregate, it first attempts to load from a recent snapshot and then replays only the events that occurred since. When saving, it commits events and then, based on a configured policy, creates and stores a new snapshot of the aggregate's state.
func NewESRepoWithSnapshots ¶
func NewESRepoWithSnapshots[TID ID, E event.Any, R Root[TID, E], TS Snapshot[TID]]( esRepo Repository[TID, E, R], snapstore SnapshotStore[TID, TS], snapshotter Snapshotter[TID, E, R, TS], snapPolicy SnapshotPolicy[TID, E, R], opts ...ESRepoWithSnapshotsOption, ) (*ESRepoWithSnapshots[TID, E, R, TS], error)
NewESRepoWithSnapshots creates a new repository decorator that adds snapshotting functionality.
Usage:
// baseRepo is a standard event-sourced repository. // snapStore is an implementation of aggregate.SnapshotStore. // snapshotter is an implementation of aggregate.Snapshotter. repoWithSnaps, err := aggregate.NewESRepoWithSnapshots( baseRepo, snapStore, snapshotter, aggregate.SnapPolicyFor[...].EveryNEvents(50), // Snapshot every 50 events. )
Returns a new repository equipped with snapshotting capabilities, or an error if the configuration is invalid.
func (*ESRepoWithSnapshots[TID, E, R, TS]) Get ¶
func (esr *ESRepoWithSnapshots[TID, E, R, TS]) Get(ctx context.Context, id TID) (R, error)
Get retrieves an aggregate's state. It first attempts to load the aggregate from the most recent snapshot. If a snapshot is found, it then replays only the events that occurred after that snapshot's version. If no snapshot is found, it falls back to replaying the aggregate's entire event history.
Usage:
// The repository will automatically use a snapshot if available. account, err := repo.Get(ctx, accountID)
Returns the fully constituted aggregate root and an error if loading fails.
func (*ESRepoWithSnapshots[TID, E, R, TS]) GetVersion ¶
func (*ESRepoWithSnapshots[TID, E, R, TS]) LoadAggregate ¶
func (*ESRepoWithSnapshots[TID, E, R, TS]) Save ¶
func (esr *ESRepoWithSnapshots[TID, E, R, TS]) Save( ctx context.Context, root R, ) (version.Version, CommittedEvents[E], error)
Save persists the uncommitted events of an aggregate. After successfully saving the events, it runs the configured SnapshotPolicy to determine whether a new snapshot of the aggregate's state should also be saved.
Usage:
account.DepositMoney(100) // Events are saved, and a snapshot may be created and saved. _, _, err := repo.Save(ctx, account)
Returns the new version of the aggregate, the list of committed events, and an error if either event persistence or (if configured to) snapshot persistence fails.
type ESRepoWithSnapshotsOption ¶
type ESRepoWithSnapshotsOption func(esRepoWithSnapshotsConfigurator)
func OnSnapshotError ¶
func OnSnapshotError(fn OnSnapshotErrFunc) ESRepoWithSnapshotsOption
OnSnapshotError provides an option to set a custom handler for snapshot save errors.
Usage:
// Configure the repository to log and ignore snapshot save errors
repo, err := aggregate.NewESRepoWithSnapshots(
...,
aggregate.OnSnapshotError(func(ctx context.Context, err error) error {
log.Printf("Failed to save snapshot: %v", err)
return nil // nil means the error is handled and won't be returned by Save
}),
)
func SnapshotSaveEnabled ¶
func SnapshotSaveEnabled(enabled bool) ESRepoWithSnapshotsOption
SnapshotSaveEnabled provides an option to enable or disable the saving of snapshots. This can be useful for diagnostics or for scenarios where snapshotting is controlled by a separate process. The default is true.
Usage:
// Disable snapshot saving for this repository instance.
repo, err := aggregate.NewESRepoWithSnapshots(
...,
aggregate.SnapshotSaveEnabled(false),
)
type FusedRepo ¶
type FusedRepo[TID ID, E event.Any, R Root[TID, E]] struct { AggregateLoader[TID, E, R] VersionedGetter[TID, E, R] Getter[TID, E, R] Saver[TID, E, R] }
FusedRepo is a convenience type that implements the Repository interface by composing its constituent parts. It is useful for creating repository decorators, such as a retry wrapper, where you might only want to override one behavior (like Save) while keeping the others.
Usage:
// Wrap a standard repository's Save method with a custom retry mechanism
baseRepo := ...
repoWithRetry := &FusedRepo[...]{
AggregateLoader: baseRepo,
VersionedGetter: baseRepo,
Getter: baseRepo,
Saver: &MyCustomSaverWithRetry{saver: baseRepo},
}
type Getter ¶
type Getter[TID ID, E event.Any, R Root[TID, E]] interface { Get(ctx context.Context, id TID) (R, error) }
Getter is responsible for retrieving the latest state of an aggregate root. It loads all events for the given ID and replays them to reconstruct the aggregate.
Usage:
// repo implements Getter account, err := repo.Get(ctx, "account-123")
Returns the fully hydrated aggregate root or an error if the aggregate is not found or loading fails.
type ID ¶
ID is a constraint for an aggregate's identifier. Any type used as an ID must implement the `fmt.Stringer` interface.
type OnSnapshotErrFunc ¶
OnSnapshotErrFunc defines the signature for a function that handles errors occurring during the saving of a snapshot.
Since snapshots are a performance optimization and not the source of truth, it is often acceptable to log the error and continue without returning it, preventing a snapshot failure from failing the entire operation.
Usage:
// Configure the repository to log snapshot errors but not fail the Save operation.
option := aggregate.OnSnapshotError(func(ctx context.Context, err error) error {
log.Printf("Warning: failed to save snapshot: %v", err)
return nil // Returning nil ignores the error.
})
type ProcessorChain ¶
type ProcessorChain[TX any, TID ID, E event.Any, R Root[TID, E]] struct { // contains filtered or unexported fields }
ProcessorChain chains multiple TransactionalAggregateProcessor instances together, executing them sequentially as a single unit. It implements the TransactionalAggregateProcessor interface itself, allowing it to be used anywhere a single processor is expected.
This is useful for composing multiple transactional side-effects for a single aggregate save operation, such as updating a read model *and* creating an outbox entry in the same transaction.
If any processor in the chain returns an error, execution stops immediately, and the error is returned. This ensures that the entire chain's operations, and thus the parent transaction, are atomic.
Example:
// Create individual processors
outboxProcessor := &OutboxProcessor{}
projectionProcessor := &ProjectionProcessor{}
// Combine them into a single chain
processorChain := NewProcessorChain(
outboxProcessor,
projectionProcessor,
)
// Use the chain with a transactional repository
repo, err := NewTransactionalRepository(
db,
account.NewEmpty,
nil,
processorChain, // The chain acts as a single processor
)
func NewProcessorChain ¶
func NewProcessorChain[TX any, TID ID, E event.Any, R Root[TID, E]]( processors ...TransactionalAggregateProcessor[TX, TID, E, R], ) ProcessorChain[TX, TID, E, R]
NewProcessorChain creates a new ProcessorChain from the provided processors. The processors will be executed by the chain's Process method in the same order they are provided here.
func (ProcessorChain[TX, TID, E, R]) Process ¶
func (pc ProcessorChain[TX, TID, E, R]) Process( ctx context.Context, tx TX, root R, events CommittedEvents[E], ) error
Process executes each processor in the chain sequentially. It satisfies the TransactionalAggregateProcessor interface.
Each processor is called with the same context, transaction handle, aggregate root, and committed events.
If any processor returns an error, execution of the chain is halted, and the error is returned immediately. This will cause the parent transaction to be rolled back. If all processors complete successfully, it returns nil.
type Repository ¶
type Repository[TID ID, E event.Any, R Root[TID, E]] interface { AggregateLoader[TID, E, R] VersionedGetter[TID, E, R] Getter[TID, E, R] Saver[TID, E, R] }
Repository combines the core operations for loading and saving an aggregate root. It is the primary interface for interacting with aggregates.
type Root ¶
type Root[TID ID, E event.Any] interface { Aggregate[TID, E] Versioner event.EventFuncCreator[E] // contains filtered or unexported methods }
Root represents the complete contract for an aggregate root in this framework. It combines the `Aggregate`, `Versioner`, and `event.EventFuncCreator` interfaces, along with internal methods for event handling. The easiest way to satisfy this interface is to embed `aggregate.Base` in your struct.
type Saver ¶
type Saver[TID ID, E event.Any, R Root[TID, E]] interface { Save(ctx context.Context, root R) (version.Version, CommittedEvents[E], error) }
Saver is responsible for persisting the uncommitted events of an aggregate root.
Usage:
// repo implements Saver newVersion, committedEvents, err := repo.Save(ctx, myAccount)
Returns the new version of the aggregate, the events that were committed, and an error if saving fails (e.g., a `version.ConflictError`).
type Snapshot ¶
Snapshot represents the contract for an encoded, point-in-time representation of an aggregate's state. Implementations of this interface hold the necessary data to reconstruct an aggregate, along with its ID and the version at which the snapshot was taken.
Usage:
type AccountSnapshot struct {
AccountID account.AccountID
AggregateVersion version.Version
Balance int
}
func (s *AccountSnapshot) ID() account.AccountID { return s.AccountID }
func (s *AccountSnapshot) Version() version.Version { return s.AggregateVersion }
type SnapshotPolicy ¶ added in v0.5.0
type SnapshotPolicy[TID ID, E event.Any, R Root[TID, E]] interface { ShouldSnapshot( ctx context.Context, root R, previousVersion version.Version, newVersion version.Version, committedEvents CommittedEvents[E], ) bool }
SnapshotPolicy defines the policy for when to create a snapshot of an aggregate. Implementations of this interface allow for flexible and domain-specific rules to decide if a snapshot is wanted after a successful save operation.
Usage:
// This interface is used when configuring a repository with snapshotting.
type myCustomPolicy struct {}
func (s *myCustomPolicy) ShouldSnapshot(...) bool {
// custom logic
return true
}
repoWithSnaps, _ := chronicle.NewEventSourcedRepositoryWithSnapshots(
baseRepo,
snapStore,
snapshotter,
&myCustomPolicy{},
)
type SnapshotStore ¶
type SnapshotStore[TID ID, TS Snapshot[TID]] interface { SaveSnapshot(ctx context.Context, snapshot TS) error // GetSnapshot retrieves the latest snapshot for a given aggregate ID. // // Returns the snapshot, a boolean indicating if it was found, and an error if one occurred. GetSnapshot(ctx context.Context, aggregateID TID) (TS, bool, error) }
SnapshotStore defines the contract for a storage mechanism for snapshots. Implementations are responsible for persisting and retrieving snapshot data, for example, in memory, in a database table, or in a key-value store.
Usage:
// snapStore is an implementation of SnapshotStore, e.g., snapshotstore.NewMemory()
err := snapStore.SaveSnapshot(ctx, mySnapshot)
if err != nil {
// handle error
}
snapshot, found, err := snapStore.GetSnapshot(ctx, aggregateID)
if err != nil {
// handle error
}
if found {
// use the snapshot
}
type Snapshotter ¶
type Snapshotter[TID ID, E event.Any, R Root[TID, E], TS Snapshot[TID]] interface { ToSnapshot(R) (TS, error) FromSnapshot(TS) (R, error) }
Snapshotter defines the mechanism for converting a live aggregate root into its snapshot representation and back. It acts as a bridge between the domain object and its persisted state.
Usage:
type AccountSnapshotter struct{}
func (s *AccountSnapshotter) ToSnapshot(acc *account.Account) (*account.Snapshot, error) {
return &account.Snapshot{
AccountID: acc.ID(),
AggregateVersion: acc.Version(),
Balance: acc.Balance(),
}, nil
}
func (s *AccountSnapshotter) FromSnapshot(snap *account.Snapshot) (*account.Account, error) {
// Recreate the aggregate from snapshot data
acc := account.NewEmpty()
// ... set fields from snap ...
return acc, nil
}
type TransactionalAggregateProcessor ¶
type TransactionalAggregateProcessor[TX any, TID ID, E event.Any, R Root[TID, E]] interface { // Process is called by the TransactionalRepository *inside* an active transaction, // immediately after the aggregate's events have been successfully saved to the event log. // It receives the transaction handle, the aggregate in its new state, and the // strongly-typed events that were just committed. // // Returns an error if processing fails. This will cause the entire transaction to be // rolled back, including the saving of the events. Returns nil on success. Process(ctx context.Context, tx TX, root R, events CommittedEvents[E]) error }
TransactionalAggregateProcessor defines a contract for processing an aggregate and its committed events within the same transaction as the save operation. This is a high-level, type-safe hook that is useful for atomically updating read models (projections) or creating outbox messages.
The 'Process' method is called by a TransactionalRepository *inside* an active transaction, immediately after the aggregate's events have been successfully saved to the event log. This guarantees that the event log write and any side effects performed by the processor (like updating a projection table or inserting an outbox message) either all succeed or all fail together.
TX is the transaction handle type (e.g., *sql.Tx). TID is the aggregate's ID type. E is the aggregate's base event type. R is the aggregate root type.
Usage (Outbox Pattern with *sql.Tx):
// Assume an outbox table:
// CREATE TABLE outbox_messages (id SERIAL PRIMARY KEY, event_name TEXT, payload JSONB);
import (
"database/sql"
"github.com/DeluxeOwl/chronicle/examples/internal/account"
)
type OutboxProcessor struct {
// ... dependencies like a logger
}
func (p *OutboxProcessor) Process(
ctx context.Context,
tx *sql.Tx, // The active database transaction
root *account.Account,
events CommittedEvents[account.AccountEvent],
) error {
stmt, err := tx.PrepareContext(ctx, "INSERT INTO outbox_messages (event_name, payload) VALUES ($1, $2)")
if err != nil {
return fmt.Errorf("prepare outbox insert: %w", err)
}
defer stmt.Close()
for evt := range events.All() {
payload, err := json.Marshal(evt)
if err != nil {
return fmt.Errorf("marshal event %s for outbox: %w", evt.EventName(), err)
}
if _, err := stmt.ExecContext(ctx, evt.EventName(), payload); err != nil {
return fmt.Errorf("insert event %s into outbox: %w", evt.EventName(), err)
}
}
return nil
}
// Then, wire it into a transactional repository:
// outboxProcessor := &OutboxProcessor{}
// repo, err := NewTransactionalRepository(
// postgresEventLog, // a transactional event log
// account.NewEmpty,
// nil, // transformers
// outboxProcessor,
// )
type TransactionalRepository ¶
type TransactionalRepository[T any, TID ID, E event.Any, R Root[TID, E]] struct { // contains filtered or unexported fields }
TransactionalRepository is a repository implementation that ensures that saving an aggregate's events and processing them (e.g., for updating projections) occur within a single, atomic transaction. It orchestrates operations using an `event.Transactor` and an `event.TransactionalLog`, and can execute a type-safe `TransactionalAggregateProcessor` as part of the same transaction. This is the useful for implementing strongly consistent read models or the transactional outbox pattern.
func NewTransactionalRepository ¶
func NewTransactionalRepository[TX any, TID ID, E event.Any, R Root[TID, E]]( log event.TransactionalEventLog[TX], createRoot func() R, transformers []event.Transformer[E], aggProcessor TransactionalAggregateProcessor[TX, TID, E, R], opts ...ESRepoOption, ) (*TransactionalRepository[TX, TID, E, R], error)
NewTransactionalRepository creates a repository that manages operations within an atomic transaction. This constructor is a convenience for when the event log implementation (like the provided Postgres or Sqlite logs) also serves as the transaction manager by implementing `event.TransactionalEventLog`.
Usage:
// postgresLog implements event.TransactionalEventLog[*sql.Tx]
// myProcessor implements aggregate.TransactionalAggregateProcessor for *sql.Tx
repo, err := aggregate.NewTransactionalRepository(
postgresLog,
account.NewEmpty,
nil, // no transformers
myProcessor,
)
Returns a new `*TransactionalRepository` configured for atomic operations, or an error if setup fails.
func NewTransactionalRepositoryWithTransactor ¶
func NewTransactionalRepositoryWithTransactor[TX any, TID ID, E event.Any, R Root[TID, E]]( transactor event.Transactor[TX], txLog event.TransactionalLog[TX], createRoot func() R, transformers []event.Transformer[E], aggProcessor TransactionalAggregateProcessor[TX, TID, E, R], opts ...ESRepoOption, ) (*TransactionalRepository[TX, TID, E, R], error)
NewTransactionalRepositoryWithTransactor creates a transactional repository with a separate transactor and log. This constructor provides more flexibility by decoupling the transaction management from the event storage logic. It is useful in advanced scenarios where you might use a generic transaction coordinator.
Usage:
// myTransactor implements event.Transactor[*sql.Tx]
// myTxLog implements event.TransactionalLog[*sql.Tx]
repo, err := aggregate.NewTransactionalRepositoryWithTransactor(
myTransactor,
myTxLog,
account.NewEmpty,
nil, // no transformers
myProcessor,
)
Returns a new `*TransactionalRepository`, or an error if setup fails.
func (*TransactionalRepository[TX, TID, E, R]) Get ¶
func (repo *TransactionalRepository[TX, TID, E, R]) Get(ctx context.Context, id TID) (R, error)
func (*TransactionalRepository[TX, TID, E, R]) GetVersion ¶
func (*TransactionalRepository[TX, TID, E, R]) LoadAggregate ¶
func (*TransactionalRepository[TX, TID, E, R]) Save ¶
func (repo *TransactionalRepository[TX, TID, E, R]) Save( ctx context.Context, root R, ) (version.Version, CommittedEvents[E], error)
Save atomically persists the aggregate's uncommitted events and executes the configured transactional processor within a single database transaction. If any part of the process fails, the entire transaction is rolled back, ensuring data consistency.
Usage:
// acc is an aggregate with uncommitted events
newVersion, committedEvents, err := transactionalRepo.Save(ctx, acc)
if err != nil {
// The transaction was rolled back.
// Handle the conflict or transient error.
}
Returns the aggregate's new version and the list of committed events on success. Returns an error if saving the events or executing the processor fails.
type UncommittedEvents ¶
UncommittedEvents represents a strongly-typed slice of events that have been recorded on an aggregate but not yet persisted to the event log.
func FlushUncommittedEvents ¶
func FlushUncommittedEvents[TID ID, E event.Any, R Root[TID, E]]( root R, ) UncommittedEvents[E]
FlushUncommittedEvents clears the pending events from an aggregate and returns them as a strongly-typed slice. This is the first step in the `Save` process.
Usage (called by `CommitEvents`):
uncommitted := aggregate.FlushUncommittedEvents(root)
Returns a slice of the uncommitted events.
type VersionedGetter ¶
type VersionedGetter[TID ID, E event.Any, R Root[TID, E]] interface { GetVersion(ctx context.Context, id TID, selector version.Selector) (R, error) }
VersionedGetter is responsible for retrieving an aggregate root at a specific version. It loads and replays events up to the version specified in the selector.
Usage:
// repo implements VersionedGetter
// Get the state of the account after its 5th event
account, err := repo.GetVersion(ctx, "account-123", version.Selector{From: 1, To: 5})
Returns the hydrated aggregate root at the specified version or an error.