event

package
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2026 License: MIT Imports: 5 Imported by: 0

Documentation

Overview

Package event contains types and implementations for dealing with Domain Events.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Appender

type Appender interface {
	Append(ctx context.Context, id StreamID, expected version.Check, events ...Envelope) (version.Version, error)
}

Appender is an event.Store trait used to append new Domain Events in the Event Stream.

type Envelope

type Envelope message.GenericEnvelope

Envelope contains a Domain Event and possible metadata associated to it.

Due to lack of sum types (a.k.a enum types), Events cannot currently take advantage of the new generics feature introduced with Go 1.18.

func ToEnvelope

func ToEnvelope(event Event) Envelope

ToEnvelope returns an Envelope instance with the provided Event instance and no Metadata.

func ToEnvelopes

func ToEnvelopes(events ...Event) []Envelope

ToEnvelopes returns a list of Envelopes from a list of Events. The returned Envelopes have no Metadata.

type Event

type Event message.Message

Event is a Message representing some Domain information that has happened in the past, which is of vital information to the Domain itself.

Event type names should be phrased in the past tense, to enforce the notion of "information happened in the past".

type FusedStore

type FusedStore struct {
	Appender
	Streamer
}

FusedStore is a convenience type to fuse multiple Event Store interfaces where you might need to extend the functionality of the Store only partially.

E.g. You might want to extend the functionality of the Append() method, but keep the Streamer methods the same.

If the extension wrapper does not support the Streamer interface, you cannot use the extension wrapper instance as an Event Store in certain cases (e.g. the Aggregate Repository).

Using a FusedStore instance you can fuse both instances together, and use it with the rest of the library ecosystem.

type InMemoryStore

type InMemoryStore struct {
	// contains filtered or unexported fields
}

InMemoryStore is a thread-safe, in-memory event.Store implementation.

func NewInMemoryStore

func NewInMemoryStore() *InMemoryStore

NewInMemoryStore creates a new event.InMemoryStore instance.

func (*InMemoryStore) Append

func (es *InMemoryStore) Append(
	_ context.Context,
	id StreamID,
	expected version.Check,
	events ...Envelope,
) (version.Version, error)

Append inserts the specified Domain Events into the Event Stream specified by the current instance, returning the new version of the Event Stream.

`version.CheckExact` can be specified to enable an Optimistic Concurrency check on append, by using the expected version of the Event Stream prior to appending the new Events.

Alternatively, `version.Any` can be used if no Optimistic Concurrency check should be carried out.

An instance of `version.ConflictError` will be returned if the optimistic locking version check fails against the current version of the Event Stream.

func (*InMemoryStore) Stream

func (es *InMemoryStore) Stream(
	ctx context.Context,
	id StreamID,
	selector version.Selector,
) *Stream

Stream returns a Stream over the committed events for the given Event Stream, filtered by the provided version.Selector.

The returned Stream holds a read-lock on the underlying store for the duration of iteration; long-paused iterations will block concurrent writers.

Iteration stops if the consumer abandons the range loop or if the context is canceled between yields.

type Persisted

type Persisted struct {
	StreamID
	version.Version
	Envelope
}

Persisted represents an Domain Event that has been persisted into the Event Store.

type Processor

type Processor interface {
	Process(ctx context.Context, event Persisted) error
}

Processor represents a component that can process persisted Domain Events.

type ProcessorFunc

type ProcessorFunc func(ctx context.Context, event Persisted) error

ProcessorFunc is a functional implementation of the Processor interface.

func (ProcessorFunc) Process

func (pf ProcessorFunc) Process(ctx context.Context, event Persisted) error

Process implements the event.Processor interface.

type Store

type Store interface {
	Appender
	Streamer
}

Store represents an Event Store, a stateful data source where Domain Events can be safely stored, and easily replayed.

type Stream

type Stream = message.Stream[Persisted]

Stream is a single-use, iterator-backed sequence of persisted Domain Events coming from some stream-able source of data, like an Event Store.

Stream is an alias for message.Stream[Persisted]. See message.Stream for the full iteration and error-reporting contract.

func NewStream added in v0.4.1

func NewStream(produce func(yield func(Persisted) bool) error) *Stream

NewStream wraps a producer into a Stream. Convenience re-export of message.NewStream for values of type Persisted.

func SliceToStream

func SliceToStream(events []Persisted) *Stream

SliceToStream returns a Stream that yields each element of events in order.

Useful for tests and for adapting fully-buffered results.

type StreamID

type StreamID string

StreamID identifies an Event Stream, which is a log of ordered Domain Events.

type Streamer

type Streamer interface {
	Stream(ctx context.Context, id StreamID, selector version.Selector) *Stream
}

Streamer is an event.Store trait used to open a specific Event Stream and stream it back in the application.

Implementations should respect ctx cancellation between yields by checking ctx.Err() at loop boundaries inside the producer.

type TrackingStore

type TrackingStore struct {
	Store
	// contains filtered or unexported fields
}

TrackingStore is an Event Store wrapper to track the Events committed to the inner Event Store.

Useful for tests assertion.

TrackingStore embeds a full Store: Stream is inherited through the embedded value; only Append is overridden to record events as they are appended.

func NewTrackingStore

func NewTrackingStore(store Store) *TrackingStore

NewTrackingStore wraps an Event Store to capture events that get appended to it.

func (*TrackingStore) Append

func (es *TrackingStore) Append(
	ctx context.Context,
	id StreamID,
	expected version.Check,
	events ...Envelope,
) (version.Version, error)

Append forwards the call to the wrapped Event Store instance and, if the operation concludes successfully, records these events internally.

The recorded events can be accessed by calling Recorded().

func (*TrackingStore) Recorded

func (es *TrackingStore) Recorded() []Persisted

Recorded returns the list of Events that have been appended to the Event Store.

Each returned Persisted event carries the Version assigned by the wrapped Event Store, reconstructed from the version returned by Append and the order in which events were appended.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL