evercore

package
v0.0.26 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2025 License: MIT Imports: 6 Imported by: 3

Documentation

Index

Constants

View Source
const (
	ErrorNotFound                = "not_found"
	ErrorTypeConstraintViolation = "constraint_violation"
)

Common storage engine error types

Variables

View Source
var ErrorKeyExceedsMaximumLength error = fmt.Errorf("The specified key exceeds the maximum key length")

Functions

func CopyFields

func CopyFields(e, t any) error

CopyFields uses reflection to copy matching fields from e to t. If a pointer field in e is nil, the field is skipped.

func DecodeEventStateTo

func DecodeEventStateTo[U any](e SerializedEvent, state *U) error

Decodes the event state into the specified type.

func DeserializeFromJson

func DeserializeFromJson[T any](serialized string, target *T) error

func InContext

func InContext[T any](ctx context.Context, store *EventStore, exec contextFuncReturns[T]) (T, error)

Executes a function callback that returns a value after the context complets. This is the similar to the EventStore.Incontext method except it allows a return value after completion.

func InReadonlyContext

func InReadonlyContext[T any](ctx context.Context, store *EventStore, exec readonlyContextFuncReturns[T]) (T, error)

Executes a function within a readonly context returning a value.

func RegisterStateEventDecoder

func RegisterStateEventDecoder(decoder EventDecoder)

RegisterStateEventDecoder sets the EventDecoder for StateAggregate - the function should be able to decode all events for any aggregate using StateAggregate.

func SerializeToJson

func SerializeToJson(ev any) string

func TypeNameFromType

func TypeNameFromType[T any](ev T) string

Types

type Aggregate

type Aggregate interface {
	GetId() int64
	SetId(int64)
	GetSequence() int64
	SetSequence(int64)
	GetAggregateType() string
	GetSnapshotFrequency() int64
	GetSnapshotState() (*string, error)
	DecodeEvent(ev SerializedEvent) (EventState, error)
	ApplyEventState(eventState EventState, eventTime time.Time, reference string) error
	ApplySnapshot(snapshot *Snapshot) error
}

Optinionated version of aggregate

type AggregateState

type AggregateState struct {
	AggregateId int64
	NaturalKey  *string
	Snapshot    *Snapshot
	Events      EventSlice
}

Represents the state of the aggregate including the latest snapshot (if it exists) and the events required to reconstitute its current state.

type ContextOwner

type ContextOwner interface {
	// contains filtered or unexported methods
}

Methods for the EventStoreContext uses.

type EventDecoder

type EventDecoder func(aggregateType string, ev SerializedEvent) (EventState, error)

EventDecoder is a function that decodes an event into an EventState.

type EventSlice

type EventSlice []SerializedEvent

Slice of serialized events.

type EventState

type EventState interface {
	GetEventType() string
	Serialize() string
}

EventState represents the state of an event.

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

EventStore is the main entry point for interacting with the event store.

func NewEventStore

func NewEventStore(storageEngine StorageEngine) *EventStore

func (*EventStore) SaveState

func (store *EventStore) SaveState(ctx context.Context, tx StorageEngineTxInfo, events EventSlice, snapshots SnapshotSlice) error

This is used at the end of a context to save all the events and snapshots that happened during the context.

func (*EventStore) Warmup

func (store *EventStore) Warmup(ctx context.Context, knownAggregateTypes []string, knownEventTypes []string) error

Warmup pre-loads aggregate and event types into memory to avoid database calls during contexts. This is intended to be called during application startup.

Parameters:

  • knownAggregateTypes: List of aggregate type names to pre-load
  • knownEventTypes: List of event type names to pre-load

This memoizes the type IDs in an internal map, saving database calls during normal operation. Should be called as early as possible in application startup.

func (*EventStore) WithContext

func (store *EventStore) WithContext(ctx context.Context, exec contextFunc) error

Executes a function inside a context This is used to ensure the all events and snapshots are written after the context completes or that they are not written when an error happens.

func (*EventStore) WithReadonlyContext

func (store *EventStore) WithReadonlyContext(ctx context.Context, exec readonlyContextFunc) error

Executes a callback function within a readonly context

type EventStoreContext

type EventStoreContext interface {
	EventStoreReadonlyContext
	NewAggregateId(aggregateType string) (int64, error)
	NewAggregateIdWithKey(aggregateType string, naturalKey string) (int64, error)

	LoadOrCreateAggregate(agg Aggregate, naturalKey string) (bool, error)
	CreateAggregateInto(agg Aggregate) error
	CreateAggregateWithKeyInto(agg Aggregate, naturalKey string) error
	ApplyEventTo(agg Aggregate, event EventState, time time.Time, reference string) error

	Publish(*SerializedEvent)
	SaveSnapshot(snapshot Snapshot)
}

Represents a read/write context access to the event store.

type EventStoreContextType

type EventStoreContextType struct {
	Transaction StorageEngineTxInfo
	// contains filtered or unexported fields
}

Tracks information related to the current event store context.

func (*EventStoreContextType) ApplyEventTo

func (etx *EventStoreContextType) ApplyEventTo(agg Aggregate, eventState EventState, time time.Time, reference string) error

Applies an event to the aggregate.

func (*EventStoreContextType) CreateAggregateInto

func (etx *EventStoreContextType) CreateAggregateInto(agg Aggregate) error

Creates a new aggregate of the specified type.

func (*EventStoreContextType) CreateAggregateWithKeyInto

func (etx *EventStoreContextType) CreateAggregateWithKeyInto(agg Aggregate, naturalKey string) error

Creates a new aggregate of the specified type with the specified natural key.

func (*EventStoreContextType) LoadAggregateState

func (ctx *EventStoreContextType) LoadAggregateState(aggregateType string, aggregateId int64) (*AggregateState, error)

Loads the most recent snapshot and events from the event store

func (*EventStoreContextType) LoadAggregateStateByKey

func (ctx *EventStoreContextType) LoadAggregateStateByKey(aggregateType string, naturalKey string) (*AggregateState, error)

Loads the aggregate state using the natural key.

func (*EventStoreContextType) LoadOrCreateAggregate

func (etx *EventStoreContextType) LoadOrCreateAggregate(agg Aggregate, naturalKey string) (bool, error)

LoadOrCreateAggregate loads an existing aggregate by natural key if it exists, otherwise creates a new one. Returns true if the aggregate was created, false if loaded.

func (*EventStoreContextType) LoadStateByKeyInto

func (etx *EventStoreContextType) LoadStateByKeyInto(agg Aggregate, naturalKey string) error

Loads the state of an aggregate into the aggregate using the natural key.

func (*EventStoreContextType) LoadStateInto

func (etx *EventStoreContextType) LoadStateInto(agg Aggregate, aggregateId int64) error

Loads the state of an aggregate into the aggregate.

func (EventStoreContextType) NewAggregateId

func (ctx EventStoreContextType) NewAggregateId(aggregateType string) (int64, error)

Adds a new aggregate stream and returns the resulting id.

func (EventStoreContextType) NewAggregateIdWithKey

func (ctx EventStoreContextType) NewAggregateIdWithKey(aggregateType string, naturalKey string) (int64, error)

Adds a new aggregate stream with the specified natural key and returns the resulting id.

func (*EventStoreContextType) Publish

func (ctx *EventStoreContextType) Publish(event *SerializedEvent)

Publishes an event for storage once the context completes.

func (*EventStoreContextType) SaveSnapshot

func (etx *EventStoreContextType) SaveSnapshot(snapshot Snapshot)

Stages a snapshot to save once the context completes.

type EventStoreReadonlyContext

type EventStoreReadonlyContext interface {
	LoadStateInto(aggregate Aggregate, id int64) error
	LoadStateByKeyInto(aggregate Aggregate, naturalKey string) error

	LoadAggregateState(aggregateType string, aggregateId int64) (*AggregateState, error)
	LoadAggregateStateByKey(aggregateType string, naturalKey string) (*AggregateState, error)
}

Represents a readonly context access to the event store.

type IdNameMap

type IdNameMap map[int64]string

Represents a map of id to name.

func MapIdToName

func MapIdToName(idNamePair []IdNamePair) IdNameMap

Maps a slice of IdNamePair to a map of id to name.

type IdNamePair

type IdNamePair struct {
	Id   int64
	Name string
}

Represents a pair of id and name.

type MemoryStorageEngine

type MemoryStorageEngine struct {
	CapturedEvents    []StorageEngineEvent
	CapturedSnapshots []Snapshot

	AggregateTypes     map[string]int64
	AggregateTypesInv  map[int64]string
	EventTypes         map[string]int64
	EventTypesInv      map[int64]string
	CountAggregateType int64
	CountEventTypes    int64
	CountAggregates    int64
	AggregateToTypeId  map[int64]int64
	Aggregates         map[string]int64
	AggregateInv       map[int64]*string
}

Ephemeral memory storage engine useful for testing without a database.

func NewMemoryStorageEngine

func NewMemoryStorageEngine() *MemoryStorageEngine

NewMemoryStorageEngine creates a new in-memory storage engine.

func (*MemoryStorageEngine) GetAggregateById

func (store *MemoryStorageEngine) GetAggregateById(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, aggregateId int64) (int64, *string, error)

func (*MemoryStorageEngine) GetAggregateByKey

func (store *MemoryStorageEngine) GetAggregateByKey(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, naturalKey string) (int64, error)

func (*MemoryStorageEngine) GetAggregateTypeId

func (store *MemoryStorageEngine) GetAggregateTypeId(tx StorageEngineTxInfo, ctx context.Context, name string) (int64, error)

func (*MemoryStorageEngine) GetAggregateTypes

func (store *MemoryStorageEngine) GetAggregateTypes(tx StorageEngineTxInfo, ctx context.Context) ([]IdNamePair, error)

func (*MemoryStorageEngine) GetEventTypeId

func (store *MemoryStorageEngine) GetEventTypeId(tx StorageEngineTxInfo, ctx context.Context, name string) (int64, error)

func (*MemoryStorageEngine) GetEventTypes

func (store *MemoryStorageEngine) GetEventTypes(tx StorageEngineTxInfo, ctx context.Context) ([]IdNamePair, error)

func (*MemoryStorageEngine) GetEventsForAggregate

func (store *MemoryStorageEngine) GetEventsForAggregate(tx StorageEngineTxInfo, ctx context.Context, aggregateId int64, afterSequence int64) ([]SerializedEvent, error)

func (*MemoryStorageEngine) GetMaxKeyLength

func (stor *MemoryStorageEngine) GetMaxKeyLength() int

func (*MemoryStorageEngine) GetOrCreateAggregateByKey

func (store *MemoryStorageEngine) GetOrCreateAggregateByKey(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, naturalKey string) (bool, int64, error)

func (*MemoryStorageEngine) GetSnapshotForAggregate

func (store *MemoryStorageEngine) GetSnapshotForAggregate(tx StorageEngineTxInfo, ctx context.Context, aggregateId int64) (*Snapshot, error)

func (*MemoryStorageEngine) GetTransactionInfo

func (stor *MemoryStorageEngine) GetTransactionInfo() (StorageEngineTxInfo, error)

func (*MemoryStorageEngine) NewAggregate

func (store *MemoryStorageEngine) NewAggregate(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64) (int64, error)

func (*MemoryStorageEngine) NewAggregateWithKey

func (store *MemoryStorageEngine) NewAggregateWithKey(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, naturalKey string) (int64, error)

func (*MemoryStorageEngine) WriteState

func (store *MemoryStorageEngine) WriteState(tx StorageEngineTxInfo, ctx context.Context, events []StorageEngineEvent, snapshot SnapshotSlice) error

type MemoryStorageEngineTransaction

type MemoryStorageEngineTransaction struct {
}

MemoryStorageEngineTransaction is a transaction for the in-memory storage engine.

func (MemoryStorageEngineTransaction) Commit

func (MemoryStorageEngineTransaction) Rollback

func (tx MemoryStorageEngineTransaction) Rollback() error

type NameIdMap

type NameIdMap map[string]int64

Represents a map of name to id.

func MapNameToId

func MapNameToId(idNamePair []IdNamePair) NameIdMap

Maps a slice of IdNamePair to a map of name to id.

type SerializedEvent

type SerializedEvent struct {
	AggregateId int64
	EventType   string
	State       string
	Sequence    int64
	Reference   string
	EventTime   time.Time
}

Represents an event to be published with the state serialized.

type Snapshot

type Snapshot struct {
	AggregateId int64
	State       string
	Sequence    int64
}

Represents a snapshot of an aggregate.

type SnapshotSlice

type SnapshotSlice []Snapshot

Slice of snapshots.

type StateAggregate

type StateAggregate[T any] struct {
	Id       int64
	State    T
	Sequence int64
	// contains filtered or unexported fields
}

StateAggregate can be used for aggregates that contain simple state.

func (*StateAggregate[T]) ApplyEventState

func (t *StateAggregate[T]) ApplyEventState(eventState EventState, eventTime time.Time, reference string) error

ApplyEventState applies an event state to the aggregate

func (*StateAggregate[T]) ApplySnapshot

func (t *StateAggregate[T]) ApplySnapshot(snapshot *Snapshot) error

ApplySnapshot applies a snapshot to the aggregate

func (*StateAggregate[T]) DecodeEvent

func (t *StateAggregate[T]) DecodeEvent(ev SerializedEvent) (EventState, error)

func (*StateAggregate[T]) GetAggregateType

func (t *StateAggregate[T]) GetAggregateType() string

GetAggregateType gets the aggregate type

func (*StateAggregate[T]) GetId

func (t *StateAggregate[T]) GetId() int64

GetId gets the aggregate id

func (*StateAggregate[T]) GetSequence

func (t *StateAggregate[T]) GetSequence() int64

Implement Aggregate interface for StateAggregate

func (*StateAggregate[T]) GetSnapshotFrequency

func (t *StateAggregate[T]) GetSnapshotFrequency() int64

GetSnapshotFrequency gets the snapshot frequency

func (*StateAggregate[T]) GetSnapshotState

func (t *StateAggregate[T]) GetSnapshotState() (*string, error)

GetSnapshotState gets the snapshot state

func (*StateAggregate[T]) SetId

func (t *StateAggregate[T]) SetId(id int64)

SetId sets the aggregate id

func (*StateAggregate[T]) SetSequence

func (t *StateAggregate[T]) SetSequence(seq int64)

SetSequence sets the aggregate sequence

type StateEvent

type StateEvent[T any] struct {
	State T
}

func NewStateEvent

func NewStateEvent[T any](state T) StateEvent[T]

func (*StateEvent[T]) Deserialize

func (ev *StateEvent[T]) Deserialize(serialized string) error

func (StateEvent[T]) GetEventType

func (ev StateEvent[T]) GetEventType() string

func (StateEvent[T]) GetState

func (ev StateEvent[T]) GetState() any

func (StateEvent[T]) Serialize

func (ev StateEvent[T]) Serialize() string

type StorageEngine

type StorageEngine interface {

	// Gets the maximum length of a natural key
	GetMaxKeyLength() int

	// Gets the id for the event type string.  It is added if it does not already exist.
	GetEventTypeId(tx StorageEngineTxInfo, ctx context.Context, typeName string) (int64, error)

	// Gets the id for the aggregate type string.  It is added if it does not already exist.
	GetAggregateTypeId(tx StorageEngineTxInfo, ctx context.Context, typeName string) (int64, error)

	// Adds a new aggregate of the specified type to the store and returns the id.
	NewAggregate(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64) (int64, error)

	// Adds a new aggregate of the specified type with the natural key to the store and returns the id.
	NewAggregateWithKey(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, natrualKey string) (int64, error)

	// Gets aggregate id and corresponding natural key that corresponds to the type and id.
	GetAggregateById(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, aggregateId int64) (int64, *string, error)

	// Gets the aggregate id corresponding to the type and natural key.
	GetAggregateByKey(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, naturalKey string) (int64, error)

	// Gets or creates an aggregate id for the given type and natural key.
	// If an aggregate with the given natural key already exists, returns false and its id.
	// Otherwise creates a new aggregate with the key and returns true and its new id.
	// Returns error if:
	// - The natural key exceeds maximum length (ErrorKeyExceedsMaximumLength)
	// - The aggregate type doesn't exist
	// - There are database errors
	GetOrCreateAggregateByKey(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, naturalKey string) (bool, int64, error)

	// Loads all the aggregate types.
	GetAggregateTypes(tx StorageEngineTxInfo, ctx context.Context) ([]IdNamePair, error)

	// Loads all the event types.
	GetEventTypes(tx StorageEngineTxInfo, ctx context.Context) ([]IdNamePair, error)

	// Gets a snapshot (if any) for the specified aggregate.
	GetSnapshotForAggregate(tx StorageEngineTxInfo, ctx context.Context, aggregateId int64) (*Snapshot, error)

	// Retrieve events for an aggregate which are after a sequence.
	GetEventsForAggregate(tx StorageEngineTxInfo, ctx context.Context, aggregateId int64, afterSequence int64) ([]SerializedEvent, error)

	// Writes events to storage.
	WriteState(tx StorageEngineTxInfo, ctx context.Context, events []StorageEngineEvent, snapshot SnapshotSlice) error

	// Gets a transaction state to track multiple
	GetTransactionInfo() (StorageEngineTxInfo, error)
}

A storage engine contains the raw methods for interacting with the event store database.

The storage engine implementations should implemented to be safe for calling from multiple goroutines simultaneously.

type StorageEngineError

type StorageEngineError struct {
	// The underlying error that triggered this error
	Err error
	// A human-readable message describing the error
	Message string
	// The type of error that occurred
	ErrorType string
}

StorageEngineError represents an error from the storage engine.

func NewStorageEngineError

func NewStorageEngineError(message string, err error) *StorageEngineError

NewStorageEngineError creates a new StorageEngineError.

func (*StorageEngineError) Error

func (e *StorageEngineError) Error() string

Error returns the string representation of the error.

func (*StorageEngineError) Unwrap

func (e *StorageEngineError) Unwrap() error

Unwrap returns the underlying error.

type StorageEngineEvent

type StorageEngineEvent struct {
	AggregateID int64
	Sequence    int64
	EventTypeID int64
	State       string
	EventTime   time.Time
	Reference   string
}

Represents an event mapping to the storage engine.

type StorageEngineTxInfo

type StorageEngineTxInfo interface {
	Commit() error
	Rollback() error
}

Interface represents transaction info needed to track the transaction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL