event

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2025 License: BSD-3-Clause Imports: 9 Imported by: 2

Documentation

Index

Constants

View Source
const (
	DefaultPollInterval            = 200 * time.Millisecond
	DefaultCancelCheckpointTimeout = 5 * time.Second
)

Variables

View Source
var (
	ContinueOnError = func(ctx context.Context, err error) error { return nil }
	StopOnError     = func(ctx context.Context, err error) error { return err }
)
View Source
var GlobalRegistry = NewRegistry[Any]()

Functions

func AnyToConcrete

func AnyToConcrete[E Any](event Any) (E, bool)

AnyToConcrete is a generic helper function that safely performs a type assertion from the abstract event.Any interface to a specific, concrete event type `E`. This is typically used after decoding an event from the event store.

Usage:

var anyEvent event.Any = &moneyDeposited{Amount: 100}
if depositedEvent, ok := event.AnyToConcrete[*moneyDeposited](anyEvent); ok {
    fmt.Printf("Deposited: %d\n", depositedEvent.Amount)
}

Returns the concrete event of type `E` and `true` if the assertion is successful. If the assertion fails, it returns the zero value for `E` and `false`.

Types

type Any

type Any interface {
	EventName() string
}

Any is the fundamental interface that all event types in the system must implement. It ensures that every event has a distinct name, which is used for encoding, decoding, and routing within the framework.

Usage:

type moneyDeposited struct {
    Amount int `json:"amount"`
}

// This method satisfies the event.Any interface.
func (*moneyDeposited) EventName() string { return "account/money_deposited" }

type AnyTransformer

type AnyTransformer Transformer[Any]

AnyTransformer is an alias for a Transformer that operates on the generic `event.Any` interface. This is useful for creating transformers that are not tied to a specific aggregate's event type, such as a global encryption or logging transformer.

type Appender

type Appender interface {
	AppendEvents(
		ctx context.Context,
		id LogID,
		expected version.Check,
		events RawEvents,
	) (version.Version, error)
}

Appender is responsible for writing new events to an aggregate's log atomically. It uses an optimistic concurrency check via the `expected` version parameter to prevent race conditions.

Usage:

// store is an implementation of event.Log
// rawEvents is a slice of event.Raw
// expectedVersion is the version of the aggregate before new events are applied
newVersion, err := store.AppendEvents(ctx, "account-123", version.CheckExact(expectedVersion), rawEvents)

Returns the new version of the aggregate after the events have been successfully appended. If the optimistic concurrency check fails, it returns a `version.ConflictError`.

type AsyncProjection added in v0.5.0

type AsyncProjection interface {
	MatchesEvent(eventName string) bool
	// Handle processes a single event from the global stream.
	// Checkpoint is saved based on the configured CheckpointPolicy.
	Handle(ctx context.Context, rec *GlobalRecord) error
}

AsyncProjection processes events asynchronously from the global event stream. It processes one event at a time with explicit checkpoint management for resumability and fault tolerance.

Use cases:

  • Eventually consistent projections
  • Cross-service integration
  • Analytics and reporting
  • Email notifications, etc.

type AsyncProjectionRunner added in v0.5.0

type AsyncProjectionRunner struct {
	// contains filtered or unexported fields
}

func NewAsyncProjectionRunner added in v0.5.0

func NewAsyncProjectionRunner(
	log GlobalLog,
	checkpointer Checkpointer,
	projection AsyncProjection,
	projectionName string,
	opts ...AsyncProjectionRunnerOption,
) (*AsyncProjectionRunner, error)

func (*AsyncProjectionRunner) Run added in v0.5.0

type AsyncProjectionRunnerOption added in v0.5.0

type AsyncProjectionRunnerOption func(*AsyncProjectionRunner)

func WithCancelCheckpointTimeout added in v0.5.0

func WithCancelCheckpointTimeout(timeout time.Duration) AsyncProjectionRunnerOption

func WithCheckpointPolicy added in v0.5.0

func WithCheckpointPolicy(policy CheckpointPolicy) AsyncProjectionRunnerOption

func WithLastCheckpointSaveEnabled added in v0.5.0

func WithLastCheckpointSaveEnabled(enabled bool) AsyncProjectionRunnerOption

func WithPollInterval added in v0.5.0

func WithPollInterval(interval time.Duration) AsyncProjectionRunnerOption

func WithSaveCheckpointErrPolicy added in v0.5.0

func WithSaveCheckpointErrPolicy(
	errFunc OnSaveCheckpointErrFunc,
) AsyncProjectionRunnerOption

func WithSlogHandler added in v0.5.0

func WithSlogHandler(handler slog.Handler) AsyncProjectionRunnerOption

func WithTailing added in v0.5.0

func WithTailing(enabled bool) AsyncProjectionRunnerOption

WithTailing configures the runner to use a blocking strategy suitable for event logs that wait for new events (like a channel or a NATS subscription) instead of polling. When set to true, the PollInterval is ignored.

type CheckpointInfo added in v0.5.0

type CheckpointInfo struct {
	// EventsSinceLastSave is the number of events processed since the last checkpoint was saved.
	EventsSinceLastSave int
	// TimeSinceLastSave is the duration that has passed since the last checkpoint was saved.
	TimeSinceLastSave time.Duration
}

type CheckpointPolicy added in v0.5.0

type CheckpointPolicy interface {
	// ShouldCheckpoint is called after each event is processed to decide if it's time to save a checkpoint.
	ShouldCheckpoint(info CheckpointInfo) bool
}

CheckpointPolicy determines when a checkpoint should be saved.

func AfterDuration added in v0.5.0

func AfterDuration(d time.Duration) CheckpointPolicy

AfterDuration returns a policy that triggers a checkpoint after a given duration has passed since the last checkpoint. A checkpoint is also always saved at the end of a processed batch.

func AnyOf added in v0.5.0

func AnyOf(policies ...CheckpointPolicy) CheckpointPolicy

AnyOf returns a policy that triggers a checkpoint if any of the provided policies trigger. This allows you to combine policies, e.g., "Every 100 events OR after 5 seconds".

func EveryNEvents added in v0.5.0

func EveryNEvents(n int) CheckpointPolicy

EveryNEvents returns a policy that triggers a checkpoint after N events have been processed. A checkpoint is also always saved at the end of a processed batch.

type Checkpointer added in v0.5.0

type Checkpointer interface {
	GetCheckpoint(ctx context.Context, projectionName string) (version.Version, error)
	SaveCheckpoint(ctx context.Context, projectionName string, v version.Version) error
}

type DeleterLog added in v0.5.0

type DeleterLog interface {
	DangerouslyDeleteEventsUpTo(
		ctx context.Context,
		id LogID,
		version version.Version,
	) error
}

⚠️⚠️⚠️ WARNING: Read carefully

DeleterLog permanently deletes all events for a specific log ID up to and INCLUDING the specified version.

This operation is irreversible and breaks the immutability of the event log.

It is intended for use cases manually pruning event streams, and should be used with extreme caution.

Rebuilding aggregates or projections after this operation may lead to an inconsistent state.

It is recommended to only use this after generating a snapshot event of your aggregate state before running this. Remember to also invalidate projections that depend on deleted events and any snapshots older than the version you're calling this function with.

type EventFuncCreator

type EventFuncCreator[E Any] interface {
	EventFuncs() FuncsFor[E]
}

EventFuncCreator is an interface that an aggregate root must implement. It provides a way for the aggregate to declare all of its possible event types to the event registry.

Usage:

// An aggregate implements this interface.
type Account struct { /* ... */ }
func (a *Account) EventFuncs() event.FuncsFor[AccountEvent] { /* ... */ }

type EventFuncGetter

type EventFuncGetter[E Any] interface {
	GetFunc(eventName string) (FuncFor[E], bool)
}

EventFuncGetter is responsible for the "read" side of a registry. It defines the contract for retrieving a specific event factory function by its name.

type EventRegistrar

type EventRegistrar[E Any] interface {
	RegisterEvents(root EventFuncCreator[E]) error
}

EventRegistrar is responsible for the "write" side of a registry. It defines the contract for registering event factory functions from an aggregate.

type EventRegistry

type EventRegistry[E Any] struct {
	// contains filtered or unexported fields
}

EventRegistry is a thread-safe implementation of the Registry interface. It uses a map to store event factories and a RWMutex to handle concurrent access.

func NewRegistry

func NewRegistry[E Any]() *EventRegistry[E]

NewRegistry creates a new, empty event registry for a specific event base type.

Usage:

registry := event.NewRegistry[account.AccountEvent]()

Returns a pointer to a new EventRegistry.

func (*EventRegistry[E]) GetFunc

func (r *EventRegistry[E]) GetFunc(eventName string) (FuncFor[E], bool)

GetFunc retrieves an event factory function from the registry using its unique name. The framework uses this during decoding to create a concrete event instance before unmarshaling the data.

Usage:

factory, found := registry.GetFunc("account/money_deposited")
if found {
    event := factory() // returns a new(moneyDeposited)
}

Returns the factory function and a boolean indicating whether the name was found.

func (*EventRegistry[E]) RegisterEvents

func (r *EventRegistry[E]) RegisterEvents(root EventFuncCreator[E]) error

RegisterEvents populates the registry with the event factories from an aggregate. It iterates through the functions provided by the EventFuncCreator and maps them by their event name. This is typically called once during application startup. The default repositories call this function for you.

Usage:

accountAggregate := account.NewEmpty()
err := registry.RegisterEvents(accountAggregate)

Returns an error if an event with the same name is already registered.

type FuncFor

type FuncFor[E Any] func() E

FuncFor creates a new, zero-value instance of a specific event type. This function acts as a factory, enabling the framework to decode event data from the event store into concrete Go types without knowing the types at compile time.

Usage:

// Inside an aggregate's EventFuncs method.

func (a *Account) EventFuncs() event.FuncsFor[AccountEvent] {
    return event.FuncsFor[AccountEvent]{
        func() AccountEvent { return new(accountOpened) },
        // ... more event factories
    }
}

Returns a new instance of the event type `E`.

type FuncsFor

type FuncsFor[E Any] []FuncFor[E]

FuncsFor is a slice of event factory functions. It represents all the event types associated with a particular aggregate.

func (FuncsFor[E]) EventNames

func (funcs FuncsFor[E]) EventNames() []string

EventNames extracts the string names from all factory functions in the slice. It is a helper method primarily used for internal checks and debugging.

Returns a slice of strings, where each string is an event name.

type GlobalLog

type GlobalLog interface {
	Reader
	Appender
	GlobalReader
}

GlobalLog extends a standard Log with the ability to read all events across all aggregates, in the global order they were committed. This is useful for building projections and other system-wide read models.

type GlobalReader

type GlobalReader interface {
	ReadAllEvents(ctx context.Context, globalSelector version.Selector) GlobalRecords
}

GlobalReader defines the contract for reading the global, chronologically-ordered stream of all events from the event store.

This is implemented by event logs that can maintain a global, ordered version of ALL events, not just events scoped to a single log ID. The global version should always start at 1 (not 0) for compatibility with SQL databases.

Usage:

// store is an implementation of event.GlobalReader
// Read all events starting from global version 1
allEvents := store.ReadAllEvents(ctx, version.Selector{ From: 1 })
for gRecord, err := range allEvents {
    // process each event from the global log
}

type GlobalRecord

type GlobalRecord struct {
	// contains filtered or unexported fields
}

GlobalRecord represents a persisted event read from the global event stream. It contains all the fields of a standard Record, plus the `globalVersion`, which is its unique, sequential position across all event streams in the entire store.

func NewGlobalRecord

func NewGlobalRecord(
	globalVersion version.Version,
	version version.Version,
	logID LogID,
	name string,
	data []byte,
) *GlobalRecord

NewGlobalRecord is a constructor to create a new GlobalRecord instance.

Usage:

gRecord := event.NewGlobalRecord(
	version.Version(101), // Global version
	version.Version(5),   // Stream-specific version
	event.LogID("agg-2"),
	"event_name",
	[]byte(`{"data":"..."}`),
)

Returns a pointer to a new GlobalRecord.

func (*GlobalRecord) Data

func (gr *GlobalRecord) Data() []byte

Data returns the encoded event payload from the global record.

func (*GlobalRecord) EventName

func (gr *GlobalRecord) EventName() string

EventName returns the name of the event from the global record.

func (*GlobalRecord) GlobalVersion

func (gr *GlobalRecord) GlobalVersion() version.Version

GlobalVersion returns the globally unique, sequential version of this event.

func (*GlobalRecord) LogID

func (gr *GlobalRecord) LogID() LogID

LogID returns the identifier of the event stream this global record belongs to.

func (*GlobalRecord) Version

func (gr *GlobalRecord) Version() version.Version

Version returns the sequential version of this event within its own log stream.

type GlobalRecordOrError added in v0.5.0

type GlobalRecordOrError struct {
	Record *GlobalRecord
	Err    error
}

GlobalRecordOrError wraps a record and a potential error for transport over a channel.

type GlobalRecords

type GlobalRecords iter.Seq2[*GlobalRecord, error]

GlobalRecords is an iterator for the global sequence of *GlobalRecord instances. It allows for lazy processing of all events in the store, ordered by their global version.

Usage:

// globalLog is an implementation of event.GlobalLog
globalRecords := globalLog.ReadAllEvents(ctx, version.Selector{From: 123})
for gRec, err := range globalRecords {
    if err != nil {
        // handle error
    }
    // process gRec
}

func (GlobalRecords) Chan added in v0.5.0

func (gr GlobalRecords) Chan(ctx context.Context) <-chan GlobalRecordOrError

Chan converts the iterator into a channel-based stream. It starts a new goroutine to iterate over the records and sends them to the returned channel. The channel is closed when the iterator is exhausted or the first error is encountered.

The provided context can be used to cancel the streaming operation and shut down the goroutine, preventing leaks if the consumer stops listening.

func (GlobalRecords) Collect

func (r GlobalRecords) Collect() ([]*GlobalRecord, error)

Collect consumes the entire GlobalRecords iterator and loads all global event records into a slice in memory.

Usage:

// globalLog is an implementation of event.GlobalLog
globalIterator := globalLog.ReadAllEvents(ctx, version.Selector{From: 1})
allGlobalRecords, err := globalIterator.Collect()
if err != nil {
    // handle error
}

Returns a slice of *GlobalRecord and the first error encountered during iteration.

type Log

type Log interface {
	Reader
	Appender
}

Log combines the Reader and Appender interfaces to represent a standard event log that can read from and write to an aggregate's event stream.

type LogID

type LogID string

type OnSaveCheckpointErrFunc added in v0.5.0

type OnSaveCheckpointErrFunc = func(ctx context.Context, err error) error

type Raw

type Raw struct {
	// contains filtered or unexported fields
}

Raw represents an event that has been encoded into a byte slice. It bundles the event's name with its data payload. This is the format required by the Appender interface for writing events to the log.

func NewRaw

func NewRaw(name string, data []byte) Raw

NewRaw is a constructor to create a new Raw event instance.

Usage:

rawEvent := event.NewRaw("event_name", []byte(`{"key":"value"}`))

Returns a new Raw struct.

func (*Raw) Data

func (raw *Raw) Data() []byte

Data returns the encoded event payload.

func (*Raw) EventName

func (raw *Raw) EventName() string

EventName returns the name of the event.

type RawEvents

type RawEvents []Raw

RawEvents is a slice of Raw events, representing a batch of changes that are ready to be persisted to the event log.

Usage:

rawEvents := event.RawEvents{
    event.NewRaw("account/opened", []byte(`{"id":"123"}`)),
    event.NewRaw("account/money_deposited", []byte(`{"amount":100}`)),
}

func (RawEvents) All

func (re RawEvents) All() iter.Seq[Raw]

All returns a Go 1.22+ iterator (iter.Seq) for the RawEvents slice, allowing for convenient and efficient iteration.

Usage:

for rawEvt := range rawEvents.All() {
    // process rawEvt
}

Returns an iter.Seq[Raw].

func (RawEvents) ToRecords

func (re RawEvents) ToRecords(logID LogID, startingVersion version.Version) []*Record

ToRecords converts a slice of Raw events into a slice of fully-formed *Record instances. It assigns the given logID and calculates a sequential version number for each event, starting from the provided `startingVersion`.

Usage:

logID := event.LogID("account-123")
startingVersion := version.Version(2)
records := rawEvents.ToRecords(logID, startingVersion)
// records[0] will have version 3
// records[1] will have version 4

Returns a slice of *Record pointers.

type Reader

type Reader interface {
	ReadEvents(ctx context.Context, id LogID, selector version.Selector) Records
}

Reader is responsible for retrieving a sequence of events for a specific aggregate, identified by its LogID. It forms the read-side of an event log.

Usage:

// store is an implementation of event.Log
records := store.ReadEvents(ctx, "account-123", version.SelectFromBeginning)
for record, err := range records {
    if err != nil {
        // handle error
    }
    // process record
}

Returns a Records iterator, which allows for lazy, stream-like processing of the event history for a single aggregate.

type Record

type Record struct {
	// contains filtered or unexported fields
}

Record represents a single event that has been persisted in and read from the event log. It contains the event's name and data, the ID of the log (aggregate) it belongs to, and its unique, sequential version number within that specific log. This is the primary data structure returned by the Reader interface.

func NewRecord

func NewRecord(version version.Version, logID LogID, name string, data []byte) *Record

NewRecord is a constructor to create a new Record instance.

Usage:

record := event.NewRecord(
    version.Version(1),
    event.LogID("agg-1"),
    "event_name",
    []byte(`{"data":"...}`),
)

Returns a pointer to a new Record.

func (*Record) Data

func (re *Record) Data() []byte

Data returns the encoded event payload from the record.

func (*Record) EventName

func (re *Record) EventName() string

EventName returns the name of the event from the record.

func (*Record) LogID

func (re *Record) LogID() LogID

LogID returns the identifier of the event stream this record belongs to.

func (*Record) Version

func (re *Record) Version() version.Version

Version returns the sequential version of this event within its log stream.

type RecordOrError added in v0.5.0

type RecordOrError struct {
	Record *Record
	Err    error
}

RecordOrError wraps a record and a potential error for transport over a channel.

type Records

type Records iter.Seq2[*Record, error]

Records is an iterator for a sequence of *Record instances for a single aggregate. It allows for lazy processing, which is efficient for aggregates with long event histories as it avoids loading all events into memory at once.

Usage:

records := store.ReadEvents(ctx, "account-123", version.SelectFromBeginning)
for record, err := range records {
    if err != nil {
        // handle error
    }
    fmt.Printf("Event: %s, Version: %d\n", record.EventName(), record.Version())
}

func (Records) Chan added in v0.5.0

func (r Records) Chan(ctx context.Context) <-chan RecordOrError

Chan converts the iterator into a channel-based stream. It starts a new goroutine to iterate over the records and sends them to the returned channel. The channel is closed when the iterator is exhausted or the first error is encountered.

The provided context can be used to cancel the streaming operation and shut down the goroutine, preventing leaks if the consumer stops listening.

func (Records) Collect

func (r Records) Collect() ([]*Record, error)

Collect consumes the entire Records iterator and loads all event records into a slice in memory. This is useful when you need to have all events available before processing.

Usage:

recordsIterator := store.ReadEvents(ctx, "account-123", version.SelectFromBeginning)
allRecords, err := recordsIterator.Collect()
if err != nil {
    // handle error
}

Returns a slice of *Record and the first error encountered during iteration.

type Registry

type Registry[E Any] interface {
	EventRegistrar[E]
	EventFuncGetter[E]
}

Registry maps event names (strings) to factory functions that create concrete event types. It is a critical component for decoding, allowing the framework to reconstruct typed events from the raw data stored in the event log.

func NewConcreteRegistryFromAny

func NewConcreteRegistryFromAny[E Any](anyRegistry Registry[Any]) Registry[E]

NewConcreteRegistryFromAny creates a new type-safe registry that wraps a global, shared `event.Any` registry. This enforces application-wide uniqueness of event names.

Usage:

// In main.go or setup code:
globalRegistry := event.NewRegistry[event.Any]()

// When creating a repository for a specific aggregate:
accountRepo, err := chronicle.NewEventSourcedRepository(
    ...,
    chronicle.AnyEventRegistry(globalRegistry),
)

Returns a new `Registry[E]`.

type SyncProjection added in v0.5.0

type SyncProjection[TX any] interface {
	MatchesEvent(eventName string) bool
	// Handle processes a batch of events within a transaction.
	// All events are from the same AppendEvents call.
	Handle(ctx context.Context, tx TX, records []*Record) error
}

SyncProjection processes events synchronously within the same transaction that appends them. It receives ALL events from a single append operation as a batch, ensuring atomic updates to both the event log and projection.

Use cases:

  • Transactional outbox pattern
  • Denormalized read models requiring strong consistency
  • Cross-aggregate invariant enforcement

type TransactableLog

type TransactableLog[TX any] struct {
	// contains filtered or unexported fields
}

TransactableLog is an event.Log that orchestrates writes within a transaction and handles messages for a synchronous projection.

func NewLogWithProjection added in v0.5.0

func NewLogWithProjection[TX any](
	log TransactionalEventLog[TX],
	projection SyncProjection[TX],
) *TransactableLog[TX]

func NewTransactableLogWithProjection added in v0.5.0

func NewTransactableLogWithProjection[TX any](
	transactor Transactor[TX],
	txLog TransactionalLog[TX],
	projection SyncProjection[TX],
) *TransactableLog[TX]

func (*TransactableLog[TX]) AppendEvents

func (l *TransactableLog[TX]) AppendEvents(
	ctx context.Context,
	id LogID,
	expected version.Check,
	events RawEvents,
) (version.Version, error)

func (*TransactableLog[TX]) AppendInTx added in v0.5.0

func (l *TransactableLog[TX]) AppendInTx(
	ctx context.Context,
	tx TX,
	id LogID,
	expected version.Check,
	events RawEvents,
) (version.Version, []*Record, error)

func (*TransactableLog[TX]) ReadEvents

func (l *TransactableLog[TX]) ReadEvents(
	ctx context.Context,
	id LogID,
	selector version.Selector,
) Records

func (*TransactableLog[TX]) WithinTx added in v0.5.0

func (l *TransactableLog[TX]) WithinTx(
	ctx context.Context,
	fn func(ctx context.Context, tx TX) error,
) error

type TransactionalEventLog

type TransactionalEventLog[TX any] interface {
	TransactionalLog[TX]
	Transactor[TX]
}

type TransactionalLog

type TransactionalLog[TX any] interface {
	AppendInTx(
		ctx context.Context,
		tx TX,
		id LogID,
		expected version.Check,
		events RawEvents,
	) (version.Version, []*Record, error)
	Reader
}

type Transactor

type Transactor[TX any] interface {
	WithinTx(ctx context.Context, fn func(ctx context.Context, tx TX) error) error
}

type Transformer

type Transformer[E Any] interface {
	// TransformForWrite is called BEFORE events are encoded and saved to the event log.
	// It receives a concrete event types and must returns events of the same type.
	// Use this to encrypt, compress, or otherwise modify the events for storage.
	//
	// Returns the transformed events and an error if the transformation fails.
	TransformForWrite(ctx context.Context, event []E) ([]E, error)

	// TransformForRead is called AFTER events are loaded and decoded from the event log.
	// It should perform the inverse operation of TransformForWrite (e.g., decrypt, decompress).
	//
	// Returns the transformed events and an error if the transformation fails.
	TransformForRead(ctx context.Context, event []E) ([]E, error)
}

TODO: update docs to reflect slice change, add upcasting example + tests + efficiency Transformer defines a contract for transforming events before they are written to the event log and after they are read from it. This is a useful mechanism for implementing cross-cutting concerns like encryption, compression, or event data up-casting.

Transformers are applied in the order they are provided for writing, and in reverse order for reading.

Usage:

// An example of a 1-to-1 transformer for encryption
type CryptoTransformer struct {
	// dependencies like an encryption service
}

func (t *CryptoTransformer) TransformForWrite(ctx context.Context, events []*MyEvent) ([]*MyEvent, error) {
	for _, event := range events {
		encryptedData, err := encrypt(event.SensitiveData)
		if err != nil {
			return nil, err
		}
		event.SensitiveData = encryptedData
	}
	return events, nil
}

func (t *CryptoTransformer) TransformForRead(ctx context.Context, events []*MyEvent) ([]*MyEvent, error) {
	for _, event := range events {
		decryptedData, err := decrypt(event.SensitiveData)
		if err != nil {
			return nil, err
		}
		event.SensitiveData = decryptedData
	}
	return events, nil
}

// Then, pass it when creating the repository:
// repo, _ := chronicle.NewEventSourcedRepository(log, newAgg, []Transformer{&CryptoTransformer{...}})

// An example of a 1-to-many transformer (upcasting)
type UpcasterV1toV2 struct {}

func (u *UpcasterV1toV2) TransformForRead(ctx context.Context, events []event.Any) ([]event.Any, error) {
	newEvents := make([]event.Any, 0, len(events))
	for _, e := range events {
		if oldEvent, ok := e.(*UserRegisteredV1); ok {
			// Split V1 event into two V2 events
			newEvents = append(newEvents, &UserCreatedV2{ID: oldEvent.ID, Timestamp: oldEvent.RegisteredAt})
			newEvents = append(newEvents, &EmailAddressAddedV2{ID: oldEvent.ID, Email: oldEvent.Email})
		} else {
			// Pass-through other events unchanged
			newEvents = append(newEvents, e)
		}
	}
	return newEvents, nil
}

func AnyTransformerToTyped

func AnyTransformerToTyped[E Any](anyTransformer AnyTransformer) Transformer[E]

AnyTransformerToTyped adapts a non-specific `AnyTransformer` to a strongly-typed `Transformer[E]`. This is a helper function used internally by the framework to allow a single generic transformer to be used with multiple repositories that have different event types.

Returns a type-safe `Transformer[E]` that wraps the generic `AnyTransformer`.

type TransformerChain added in v0.5.0

type TransformerChain[E Any] struct {
	// contains filtered or unexported fields
}

TransformerChain is a composite Transformer that applies a sequence of transformers. It implements the Transformer interface, allowing multiple transformers to be treated as a single one.

On write, it applies transformers in the order they are provided. On read, it applies them in the reverse order to correctly unwind the transformations.

func NewTransformerChain added in v0.5.0

func NewTransformerChain[E Any](transformers ...Transformer[E]) TransformerChain[E]

NewTransformerChain creates a new TransformerChain from the given transformers.

func (TransformerChain[E]) TransformForRead added in v0.5.0

func (c TransformerChain[E]) TransformForRead(ctx context.Context, events []E) ([]E, error)

TransformForRead applies each transformer in the chain in reverse order. This ensures that write-time transformations are correctly undone (e.g., decompress then decrypt).

func (TransformerChain[E]) TransformForWrite added in v0.5.0

func (c TransformerChain[E]) TransformForWrite(ctx context.Context, events []E) ([]E, error)

TransformForWrite applies each transformer in the chain sequentially. The output of one transformer becomes the input for the next.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL