Documentation
¶
Index ¶
- Constants
- Variables
- func AnyToConcrete[E Any](event Any) (E, bool)
- type Any
- type AnyTransformer
- type Appender
- type AsyncProjection
- type AsyncProjectionRunner
- type AsyncProjectionRunnerOption
- func WithCancelCheckpointTimeout(timeout time.Duration) AsyncProjectionRunnerOption
- func WithCheckpointPolicy(policy CheckpointPolicy) AsyncProjectionRunnerOption
- func WithLastCheckpointSaveEnabled(enabled bool) AsyncProjectionRunnerOption
- func WithPollInterval(interval time.Duration) AsyncProjectionRunnerOption
- func WithSaveCheckpointErrPolicy(errFunc OnSaveCheckpointErrFunc) AsyncProjectionRunnerOption
- func WithSlogHandler(handler slog.Handler) AsyncProjectionRunnerOption
- func WithTailing(enabled bool) AsyncProjectionRunnerOption
- type CheckpointInfo
- type CheckpointPolicy
- type Checkpointer
- type DeleterLog
- type EventFuncCreator
- type EventFuncGetter
- type EventRegistrar
- type EventRegistry
- type FuncFor
- type FuncsFor
- type GlobalLog
- type GlobalReader
- type GlobalRecord
- type GlobalRecordOrError
- type GlobalRecords
- type Log
- type LogID
- type OnSaveCheckpointErrFunc
- type Raw
- type RawEvents
- type Reader
- type Record
- type RecordOrError
- type Records
- type Registry
- type SyncProjection
- type TransactableLog
- func (l *TransactableLog[TX]) AppendEvents(ctx context.Context, id LogID, expected version.Check, events RawEvents) (version.Version, error)
- func (l *TransactableLog[TX]) AppendInTx(ctx context.Context, tx TX, id LogID, expected version.Check, events RawEvents) (version.Version, []*Record, error)
- func (l *TransactableLog[TX]) ReadEvents(ctx context.Context, id LogID, selector version.Selector) Records
- func (l *TransactableLog[TX]) WithinTx(ctx context.Context, fn func(ctx context.Context, tx TX) error) error
- type TransactionalEventLog
- type TransactionalLog
- type Transactor
- type Transformer
- type TransformerChain
Constants ¶
const ( DefaultPollInterval = 200 * time.Millisecond DefaultCancelCheckpointTimeout = 5 * time.Second )
Variables ¶
var ( ContinueOnError = func(ctx context.Context, err error) error { return nil } StopOnError = func(ctx context.Context, err error) error { return err } )
var GlobalRegistry = NewRegistry[Any]()
Functions ¶
func AnyToConcrete ¶
AnyToConcrete is a generic helper function that safely performs a type assertion from the abstract event.Any interface to a specific, concrete event type `E`. This is typically used after decoding an event from the event store.
Usage:
var anyEvent event.Any = &moneyDeposited{Amount: 100}
if depositedEvent, ok := event.AnyToConcrete[*moneyDeposited](anyEvent); ok {
fmt.Printf("Deposited: %d\n", depositedEvent.Amount)
}
Returns the concrete event of type `E` and `true` if the assertion is successful. If the assertion fails, it returns the zero value for `E` and `false`.
Types ¶
type Any ¶
type Any interface {
EventName() string
}
Any is the fundamental interface that all event types in the system must implement. It ensures that every event has a distinct name, which is used for encoding, decoding, and routing within the framework.
Usage:
type moneyDeposited struct {
Amount int `json:"amount"`
}
// This method satisfies the event.Any interface.
func (*moneyDeposited) EventName() string { return "account/money_deposited" }
type AnyTransformer ¶
type AnyTransformer Transformer[Any]
AnyTransformer is an alias for a Transformer that operates on the generic `event.Any` interface. This is useful for creating transformers that are not tied to a specific aggregate's event type, such as a global encryption or logging transformer.
type Appender ¶
type Appender interface {
AppendEvents(
ctx context.Context,
id LogID,
expected version.Check,
events RawEvents,
) (version.Version, error)
}
Appender is responsible for writing new events to an aggregate's log atomically. It uses an optimistic concurrency check via the `expected` version parameter to prevent race conditions.
Usage:
// store is an implementation of event.Log // rawEvents is a slice of event.Raw // expectedVersion is the version of the aggregate before new events are applied newVersion, err := store.AppendEvents(ctx, "account-123", version.CheckExact(expectedVersion), rawEvents)
Returns the new version of the aggregate after the events have been successfully appended. If the optimistic concurrency check fails, it returns a `version.ConflictError`.
type AsyncProjection ¶ added in v0.5.0
type AsyncProjection interface {
MatchesEvent(eventName string) bool
// Handle processes a single event from the global stream.
// Checkpoint is saved based on the configured CheckpointPolicy.
Handle(ctx context.Context, rec *GlobalRecord) error
}
AsyncProjection processes events asynchronously from the global event stream. It processes one event at a time with explicit checkpoint management for resumability and fault tolerance.
Use cases:
- Eventually consistent projections
- Cross-service integration
- Analytics and reporting
- Email notifications, etc.
type AsyncProjectionRunner ¶ added in v0.5.0
type AsyncProjectionRunner struct {
// contains filtered or unexported fields
}
func NewAsyncProjectionRunner ¶ added in v0.5.0
func NewAsyncProjectionRunner( log GlobalLog, checkpointer Checkpointer, projection AsyncProjection, projectionName string, opts ...AsyncProjectionRunnerOption, ) (*AsyncProjectionRunner, error)
type AsyncProjectionRunnerOption ¶ added in v0.5.0
type AsyncProjectionRunnerOption func(*AsyncProjectionRunner)
func WithCancelCheckpointTimeout ¶ added in v0.5.0
func WithCancelCheckpointTimeout(timeout time.Duration) AsyncProjectionRunnerOption
func WithCheckpointPolicy ¶ added in v0.5.0
func WithCheckpointPolicy(policy CheckpointPolicy) AsyncProjectionRunnerOption
func WithLastCheckpointSaveEnabled ¶ added in v0.5.0
func WithLastCheckpointSaveEnabled(enabled bool) AsyncProjectionRunnerOption
func WithPollInterval ¶ added in v0.5.0
func WithPollInterval(interval time.Duration) AsyncProjectionRunnerOption
func WithSaveCheckpointErrPolicy ¶ added in v0.5.0
func WithSaveCheckpointErrPolicy( errFunc OnSaveCheckpointErrFunc, ) AsyncProjectionRunnerOption
func WithSlogHandler ¶ added in v0.5.0
func WithSlogHandler(handler slog.Handler) AsyncProjectionRunnerOption
func WithTailing ¶ added in v0.5.0
func WithTailing(enabled bool) AsyncProjectionRunnerOption
WithTailing configures the runner to use a blocking strategy suitable for event logs that wait for new events (like a channel or a NATS subscription) instead of polling. When set to true, the PollInterval is ignored.
type CheckpointInfo ¶ added in v0.5.0
type CheckpointPolicy ¶ added in v0.5.0
type CheckpointPolicy interface {
// ShouldCheckpoint is called after each event is processed to decide if it's time to save a checkpoint.
ShouldCheckpoint(info CheckpointInfo) bool
}
CheckpointPolicy determines when a checkpoint should be saved.
func AfterDuration ¶ added in v0.5.0
func AfterDuration(d time.Duration) CheckpointPolicy
AfterDuration returns a policy that triggers a checkpoint after a given duration has passed since the last checkpoint. A checkpoint is also always saved at the end of a processed batch.
func AnyOf ¶ added in v0.5.0
func AnyOf(policies ...CheckpointPolicy) CheckpointPolicy
AnyOf returns a policy that triggers a checkpoint if any of the provided policies trigger. This allows you to combine policies, e.g., "Every 100 events OR after 5 seconds".
func EveryNEvents ¶ added in v0.5.0
func EveryNEvents(n int) CheckpointPolicy
EveryNEvents returns a policy that triggers a checkpoint after N events have been processed. A checkpoint is also always saved at the end of a processed batch.
type Checkpointer ¶ added in v0.5.0
type DeleterLog ¶ added in v0.5.0
type DeleterLog interface {
DangerouslyDeleteEventsUpTo(
ctx context.Context,
id LogID,
version version.Version,
) error
}
⚠️⚠️⚠️ WARNING: Read carefully
DeleterLog permanently deletes all events for a specific log ID up to and INCLUDING the specified version.
This operation is irreversible and breaks the immutability of the event log.
It is intended for use cases manually pruning event streams, and should be used with extreme caution.
Rebuilding aggregates or projections after this operation may lead to an inconsistent state.
It is recommended to only use this after generating a snapshot event of your aggregate state before running this. Remember to also invalidate projections that depend on deleted events and any snapshots older than the version you're calling this function with.
type EventFuncCreator ¶
EventFuncCreator is an interface that an aggregate root must implement. It provides a way for the aggregate to declare all of its possible event types to the event registry.
Usage:
// An aggregate implements this interface.
type Account struct { /* ... */ }
func (a *Account) EventFuncs() event.FuncsFor[AccountEvent] { /* ... */ }
type EventFuncGetter ¶
EventFuncGetter is responsible for the "read" side of a registry. It defines the contract for retrieving a specific event factory function by its name.
type EventRegistrar ¶
type EventRegistrar[E Any] interface { RegisterEvents(root EventFuncCreator[E]) error }
EventRegistrar is responsible for the "write" side of a registry. It defines the contract for registering event factory functions from an aggregate.
type EventRegistry ¶
type EventRegistry[E Any] struct { // contains filtered or unexported fields }
EventRegistry is a thread-safe implementation of the Registry interface. It uses a map to store event factories and a RWMutex to handle concurrent access.
func NewRegistry ¶
func NewRegistry[E Any]() *EventRegistry[E]
NewRegistry creates a new, empty event registry for a specific event base type.
Usage:
registry := event.NewRegistry[account.AccountEvent]()
Returns a pointer to a new EventRegistry.
func (*EventRegistry[E]) GetFunc ¶
func (r *EventRegistry[E]) GetFunc(eventName string) (FuncFor[E], bool)
GetFunc retrieves an event factory function from the registry using its unique name. The framework uses this during decoding to create a concrete event instance before unmarshaling the data.
Usage:
factory, found := registry.GetFunc("account/money_deposited")
if found {
event := factory() // returns a new(moneyDeposited)
}
Returns the factory function and a boolean indicating whether the name was found.
func (*EventRegistry[E]) RegisterEvents ¶
func (r *EventRegistry[E]) RegisterEvents(root EventFuncCreator[E]) error
RegisterEvents populates the registry with the event factories from an aggregate. It iterates through the functions provided by the EventFuncCreator and maps them by their event name. This is typically called once during application startup. The default repositories call this function for you.
Usage:
accountAggregate := account.NewEmpty() err := registry.RegisterEvents(accountAggregate)
Returns an error if an event with the same name is already registered.
type FuncFor ¶
type FuncFor[E Any] func() E
FuncFor creates a new, zero-value instance of a specific event type. This function acts as a factory, enabling the framework to decode event data from the event store into concrete Go types without knowing the types at compile time.
Usage:
// Inside an aggregate's EventFuncs method.
func (a *Account) EventFuncs() event.FuncsFor[AccountEvent] {
return event.FuncsFor[AccountEvent]{
func() AccountEvent { return new(accountOpened) },
// ... more event factories
}
}
Returns a new instance of the event type `E`.
type FuncsFor ¶
FuncsFor is a slice of event factory functions. It represents all the event types associated with a particular aggregate.
func (FuncsFor[E]) EventNames ¶
EventNames extracts the string names from all factory functions in the slice. It is a helper method primarily used for internal checks and debugging.
Returns a slice of strings, where each string is an event name.
type GlobalLog ¶
type GlobalLog interface {
Reader
Appender
GlobalReader
}
GlobalLog extends a standard Log with the ability to read all events across all aggregates, in the global order they were committed. This is useful for building projections and other system-wide read models.
type GlobalReader ¶
type GlobalReader interface {
ReadAllEvents(ctx context.Context, globalSelector version.Selector) GlobalRecords
}
GlobalReader defines the contract for reading the global, chronologically-ordered stream of all events from the event store.
This is implemented by event logs that can maintain a global, ordered version of ALL events, not just events scoped to a single log ID. The global version should always start at 1 (not 0) for compatibility with SQL databases.
Usage:
// store is an implementation of event.GlobalReader
// Read all events starting from global version 1
allEvents := store.ReadAllEvents(ctx, version.Selector{ From: 1 })
for gRecord, err := range allEvents {
// process each event from the global log
}
type GlobalRecord ¶
type GlobalRecord struct {
// contains filtered or unexported fields
}
GlobalRecord represents a persisted event read from the global event stream. It contains all the fields of a standard Record, plus the `globalVersion`, which is its unique, sequential position across all event streams in the entire store.
func NewGlobalRecord ¶
func NewGlobalRecord( globalVersion version.Version, version version.Version, logID LogID, name string, data []byte, ) *GlobalRecord
NewGlobalRecord is a constructor to create a new GlobalRecord instance.
Usage:
gRecord := event.NewGlobalRecord(
version.Version(101), // Global version
version.Version(5), // Stream-specific version
event.LogID("agg-2"),
"event_name",
[]byte(`{"data":"..."}`),
)
Returns a pointer to a new GlobalRecord.
func (*GlobalRecord) Data ¶
func (gr *GlobalRecord) Data() []byte
Data returns the encoded event payload from the global record.
func (*GlobalRecord) EventName ¶
func (gr *GlobalRecord) EventName() string
EventName returns the name of the event from the global record.
func (*GlobalRecord) GlobalVersion ¶
func (gr *GlobalRecord) GlobalVersion() version.Version
GlobalVersion returns the globally unique, sequential version of this event.
func (*GlobalRecord) LogID ¶
func (gr *GlobalRecord) LogID() LogID
LogID returns the identifier of the event stream this global record belongs to.
func (*GlobalRecord) Version ¶
func (gr *GlobalRecord) Version() version.Version
Version returns the sequential version of this event within its own log stream.
type GlobalRecordOrError ¶ added in v0.5.0
type GlobalRecordOrError struct {
Record *GlobalRecord
Err error
}
GlobalRecordOrError wraps a record and a potential error for transport over a channel.
type GlobalRecords ¶
type GlobalRecords iter.Seq2[*GlobalRecord, error]
GlobalRecords is an iterator for the global sequence of *GlobalRecord instances. It allows for lazy processing of all events in the store, ordered by their global version.
Usage:
// globalLog is an implementation of event.GlobalLog
globalRecords := globalLog.ReadAllEvents(ctx, version.Selector{From: 123})
for gRec, err := range globalRecords {
if err != nil {
// handle error
}
// process gRec
}
func (GlobalRecords) Chan ¶ added in v0.5.0
func (gr GlobalRecords) Chan(ctx context.Context) <-chan GlobalRecordOrError
Chan converts the iterator into a channel-based stream. It starts a new goroutine to iterate over the records and sends them to the returned channel. The channel is closed when the iterator is exhausted or the first error is encountered.
The provided context can be used to cancel the streaming operation and shut down the goroutine, preventing leaks if the consumer stops listening.
func (GlobalRecords) Collect ¶
func (r GlobalRecords) Collect() ([]*GlobalRecord, error)
Collect consumes the entire GlobalRecords iterator and loads all global event records into a slice in memory.
Usage:
// globalLog is an implementation of event.GlobalLog
globalIterator := globalLog.ReadAllEvents(ctx, version.Selector{From: 1})
allGlobalRecords, err := globalIterator.Collect()
if err != nil {
// handle error
}
Returns a slice of *GlobalRecord and the first error encountered during iteration.
type Log ¶
Log combines the Reader and Appender interfaces to represent a standard event log that can read from and write to an aggregate's event stream.
type OnSaveCheckpointErrFunc ¶ added in v0.5.0
type Raw ¶
type Raw struct {
// contains filtered or unexported fields
}
Raw represents an event that has been encoded into a byte slice. It bundles the event's name with its data payload. This is the format required by the Appender interface for writing events to the log.
type RawEvents ¶
type RawEvents []Raw
RawEvents is a slice of Raw events, representing a batch of changes that are ready to be persisted to the event log.
Usage:
rawEvents := event.RawEvents{
event.NewRaw("account/opened", []byte(`{"id":"123"}`)),
event.NewRaw("account/money_deposited", []byte(`{"amount":100}`)),
}
func (RawEvents) All ¶
All returns a Go 1.22+ iterator (iter.Seq) for the RawEvents slice, allowing for convenient and efficient iteration.
Usage:
for rawEvt := range rawEvents.All() {
// process rawEvt
}
Returns an iter.Seq[Raw].
func (RawEvents) ToRecords ¶
ToRecords converts a slice of Raw events into a slice of fully-formed *Record instances. It assigns the given logID and calculates a sequential version number for each event, starting from the provided `startingVersion`.
Usage:
logID := event.LogID("account-123")
startingVersion := version.Version(2)
records := rawEvents.ToRecords(logID, startingVersion)
// records[0] will have version 3
// records[1] will have version 4
Returns a slice of *Record pointers.
type Reader ¶
type Reader interface {
ReadEvents(ctx context.Context, id LogID, selector version.Selector) Records
}
Reader is responsible for retrieving a sequence of events for a specific aggregate, identified by its LogID. It forms the read-side of an event log.
Usage:
// store is an implementation of event.Log
records := store.ReadEvents(ctx, "account-123", version.SelectFromBeginning)
for record, err := range records {
if err != nil {
// handle error
}
// process record
}
Returns a Records iterator, which allows for lazy, stream-like processing of the event history for a single aggregate.
type Record ¶
type Record struct {
// contains filtered or unexported fields
}
Record represents a single event that has been persisted in and read from the event log. It contains the event's name and data, the ID of the log (aggregate) it belongs to, and its unique, sequential version number within that specific log. This is the primary data structure returned by the Reader interface.
func NewRecord ¶
NewRecord is a constructor to create a new Record instance.
Usage:
record := event.NewRecord(
version.Version(1),
event.LogID("agg-1"),
"event_name",
[]byte(`{"data":"...}`),
)
Returns a pointer to a new Record.
type RecordOrError ¶ added in v0.5.0
RecordOrError wraps a record and a potential error for transport over a channel.
type Records ¶
Records is an iterator for a sequence of *Record instances for a single aggregate. It allows for lazy processing, which is efficient for aggregates with long event histories as it avoids loading all events into memory at once.
Usage:
records := store.ReadEvents(ctx, "account-123", version.SelectFromBeginning)
for record, err := range records {
if err != nil {
// handle error
}
fmt.Printf("Event: %s, Version: %d\n", record.EventName(), record.Version())
}
func (Records) Chan ¶ added in v0.5.0
func (r Records) Chan(ctx context.Context) <-chan RecordOrError
Chan converts the iterator into a channel-based stream. It starts a new goroutine to iterate over the records and sends them to the returned channel. The channel is closed when the iterator is exhausted or the first error is encountered.
The provided context can be used to cancel the streaming operation and shut down the goroutine, preventing leaks if the consumer stops listening.
func (Records) Collect ¶
Collect consumes the entire Records iterator and loads all event records into a slice in memory. This is useful when you need to have all events available before processing.
Usage:
recordsIterator := store.ReadEvents(ctx, "account-123", version.SelectFromBeginning)
allRecords, err := recordsIterator.Collect()
if err != nil {
// handle error
}
Returns a slice of *Record and the first error encountered during iteration.
type Registry ¶
type Registry[E Any] interface { EventRegistrar[E] EventFuncGetter[E] }
Registry maps event names (strings) to factory functions that create concrete event types. It is a critical component for decoding, allowing the framework to reconstruct typed events from the raw data stored in the event log.
func NewConcreteRegistryFromAny ¶
NewConcreteRegistryFromAny creates a new type-safe registry that wraps a global, shared `event.Any` registry. This enforces application-wide uniqueness of event names.
Usage:
// In main.go or setup code:
globalRegistry := event.NewRegistry[event.Any]()
// When creating a repository for a specific aggregate:
accountRepo, err := chronicle.NewEventSourcedRepository(
...,
chronicle.AnyEventRegistry(globalRegistry),
)
Returns a new `Registry[E]`.
type SyncProjection ¶ added in v0.5.0
type SyncProjection[TX any] interface { MatchesEvent(eventName string) bool // Handle processes a batch of events within a transaction. // All events are from the same AppendEvents call. Handle(ctx context.Context, tx TX, records []*Record) error }
SyncProjection processes events synchronously within the same transaction that appends them. It receives ALL events from a single append operation as a batch, ensuring atomic updates to both the event log and projection.
Use cases:
- Transactional outbox pattern
- Denormalized read models requiring strong consistency
- Cross-aggregate invariant enforcement
type TransactableLog ¶
type TransactableLog[TX any] struct { // contains filtered or unexported fields }
TransactableLog is an event.Log that orchestrates writes within a transaction and handles messages for a synchronous projection.
func NewLogWithProjection ¶ added in v0.5.0
func NewLogWithProjection[TX any]( log TransactionalEventLog[TX], projection SyncProjection[TX], ) *TransactableLog[TX]
func NewTransactableLogWithProjection ¶ added in v0.5.0
func NewTransactableLogWithProjection[TX any]( transactor Transactor[TX], txLog TransactionalLog[TX], projection SyncProjection[TX], ) *TransactableLog[TX]
func (*TransactableLog[TX]) AppendEvents ¶
func (*TransactableLog[TX]) AppendInTx ¶ added in v0.5.0
func (*TransactableLog[TX]) ReadEvents ¶
type TransactionalEventLog ¶
type TransactionalEventLog[TX any] interface { TransactionalLog[TX] Transactor[TX] }
type TransactionalLog ¶
type Transactor ¶
type Transformer ¶
type Transformer[E Any] interface { // TransformForWrite is called BEFORE events are encoded and saved to the event log. // It receives a concrete event types and must returns events of the same type. // Use this to encrypt, compress, or otherwise modify the events for storage. // // Returns the transformed events and an error if the transformation fails. TransformForWrite(ctx context.Context, event []E) ([]E, error) // TransformForRead is called AFTER events are loaded and decoded from the event log. // It should perform the inverse operation of TransformForWrite (e.g., decrypt, decompress). // // Returns the transformed events and an error if the transformation fails. TransformForRead(ctx context.Context, event []E) ([]E, error) }
TODO: update docs to reflect slice change, add upcasting example + tests + efficiency Transformer defines a contract for transforming events before they are written to the event log and after they are read from it. This is a useful mechanism for implementing cross-cutting concerns like encryption, compression, or event data up-casting.
Transformers are applied in the order they are provided for writing, and in reverse order for reading.
Usage:
// An example of a 1-to-1 transformer for encryption
type CryptoTransformer struct {
// dependencies like an encryption service
}
func (t *CryptoTransformer) TransformForWrite(ctx context.Context, events []*MyEvent) ([]*MyEvent, error) {
for _, event := range events {
encryptedData, err := encrypt(event.SensitiveData)
if err != nil {
return nil, err
}
event.SensitiveData = encryptedData
}
return events, nil
}
func (t *CryptoTransformer) TransformForRead(ctx context.Context, events []*MyEvent) ([]*MyEvent, error) {
for _, event := range events {
decryptedData, err := decrypt(event.SensitiveData)
if err != nil {
return nil, err
}
event.SensitiveData = decryptedData
}
return events, nil
}
// Then, pass it when creating the repository:
// repo, _ := chronicle.NewEventSourcedRepository(log, newAgg, []Transformer{&CryptoTransformer{...}})
// An example of a 1-to-many transformer (upcasting)
type UpcasterV1toV2 struct {}
func (u *UpcasterV1toV2) TransformForRead(ctx context.Context, events []event.Any) ([]event.Any, error) {
newEvents := make([]event.Any, 0, len(events))
for _, e := range events {
if oldEvent, ok := e.(*UserRegisteredV1); ok {
// Split V1 event into two V2 events
newEvents = append(newEvents, &UserCreatedV2{ID: oldEvent.ID, Timestamp: oldEvent.RegisteredAt})
newEvents = append(newEvents, &EmailAddressAddedV2{ID: oldEvent.ID, Email: oldEvent.Email})
} else {
// Pass-through other events unchanged
newEvents = append(newEvents, e)
}
}
return newEvents, nil
}
func AnyTransformerToTyped ¶
func AnyTransformerToTyped[E Any](anyTransformer AnyTransformer) Transformer[E]
AnyTransformerToTyped adapts a non-specific `AnyTransformer` to a strongly-typed `Transformer[E]`. This is a helper function used internally by the framework to allow a single generic transformer to be used with multiple repositories that have different event types.
Returns a type-safe `Transformer[E]` that wraps the generic `AnyTransformer`.
type TransformerChain ¶ added in v0.5.0
type TransformerChain[E Any] struct { // contains filtered or unexported fields }
TransformerChain is a composite Transformer that applies a sequence of transformers. It implements the Transformer interface, allowing multiple transformers to be treated as a single one.
On write, it applies transformers in the order they are provided. On read, it applies them in the reverse order to correctly unwind the transformations.
func NewTransformerChain ¶ added in v0.5.0
func NewTransformerChain[E Any](transformers ...Transformer[E]) TransformerChain[E]
NewTransformerChain creates a new TransformerChain from the given transformers.
func (TransformerChain[E]) TransformForRead ¶ added in v0.5.0
func (c TransformerChain[E]) TransformForRead(ctx context.Context, events []E) ([]E, error)
TransformForRead applies each transformer in the chain in reverse order. This ensures that write-time transformations are correctly undone (e.g., decompress then decrypt).
func (TransformerChain[E]) TransformForWrite ¶ added in v0.5.0
func (c TransformerChain[E]) TransformForWrite(ctx context.Context, events []E) ([]E, error)
TransformForWrite applies each transformer in the chain sequentially. The output of one transformer becomes the input for the next.