Documentation
¶
Index ¶
- Constants
- Variables
- func CopyFields(e, t any) error
- func DecodeEventStateTo[U any](e SerializedEvent, state *U) error
- func DeserializeFromJson[T any](serialized string, target *T) error
- func InContext[T any](ctx context.Context, store *EventStore, exec contextFuncReturns[T]) (T, error)
- func InReadonlyContext[T any](ctx context.Context, store *EventStore, exec readonlyContextFuncReturns[T]) (T, error)
- func RegisterEventDecoder(decoders ...EventDecoder)
- func SerializeToJson(ev any) string
- func TypeNameFromType[T any](ev T) string
- type Aggregate
- type AggregateState
- type ApplyFunc
- type ContextOwner
- type ErrorMessage
- type EventDecoder
- type EventSlice
- type EventState
- type EventStore
- func (store *EventStore) Close() error
- func (store *EventStore) RunEphemeralSubscription(ctx context.Context, filter SubscriptionFilter, start StartFrom, opts Options, ...) error
- func (store *EventStore) RunSubscription(ctx context.Context, name string, filter SubscriptionFilter, start StartFrom, ...) error
- func (store *EventStore) SaveState(ctx context.Context, tx StorageEngineTxInfo, events EventSlice, ...) error
- func (store *EventStore) Warmup(ctx context.Context, knownAggregateTypes []string, knownEventTypes []string) error
- func (store *EventStore) WithContext(ctx context.Context, exec contextFunc) error
- func (store *EventStore) WithReadonlyContext(ctx context.Context, exec readonlyContextFunc) error
- type EventStoreContext
- type EventStoreContextType
- func (etx *EventStoreContextType) ApplyEventTo(agg Aggregate, eventState EventState, time time.Time, reference string) error
- func (etx *EventStoreContextType) ChangeAggregateNaturalKey(id int64, naturalKey string) error
- func (etx *EventStoreContextType) CreateAggregateInto(agg Aggregate) error
- func (etx *EventStoreContextType) CreateAggregateWithKeyInto(agg Aggregate, naturalKey string) error
- func (ctx *EventStoreContextType) LoadAggregateState(aggregateType string, aggregateId int64) (*AggregateState, error)
- func (ctx *EventStoreContextType) LoadAggregateStateByKey(aggregateType string, naturalKey string) (*AggregateState, error)
- func (etx *EventStoreContextType) LoadOrCreateAggregate(agg Aggregate, naturalKey string) (bool, error)
- func (etx *EventStoreContextType) LoadStateByKeyInto(agg Aggregate, naturalKey string) error
- func (etx *EventStoreContextType) LoadStateInto(agg Aggregate, aggregateId int64) error
- func (ctx EventStoreContextType) NewAggregateId(aggregateType string) (int64, error)
- func (ctx EventStoreContextType) NewAggregateIdWithKey(aggregateType string, naturalKey string) (int64, error)
- func (ctx *EventStoreContextType) Publish(event *SerializedEvent)
- func (etx *EventStoreContextType) SaveSnapshot(snapshot Snapshot)
- type EventStoreReadonlyContext
- type IdNameMap
- type IdNamePair
- type MemoryStorageEngine
- func (store *MemoryStorageEngine) AddSubscriptionEventType(tx StorageEngineTxInfo, ctx context.Context, subscriptionId int64, ...) error
- func (store *MemoryStorageEngine) AdvanceSubscriptionCursor(tx StorageEngineTxInfo, ctx context.Context, id int64, lastEventId int64) error
- func (store *MemoryStorageEngine) ChangeAggregateNaturalKey(tx StorageEngineTxInfo, ctx context.Context, aggregateId int64, ...) error
- func (store *MemoryStorageEngine) ClaimSubscription(tx StorageEngineTxInfo, ctx context.Context, name string, owner string, ...) (bool, error)
- func (store *MemoryStorageEngine) Close() error
- func (store *MemoryStorageEngine) GetAggregateById(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, ...) (int64, *string, error)
- func (store *MemoryStorageEngine) GetAggregateByKey(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, ...) (int64, error)
- func (store *MemoryStorageEngine) GetAggregateTypeId(tx StorageEngineTxInfo, ctx context.Context, name string) (int64, error)
- func (store *MemoryStorageEngine) GetAggregateTypes(tx StorageEngineTxInfo, ctx context.Context) ([]IdNamePair, error)
- func (store *MemoryStorageEngine) GetEventTypeId(tx StorageEngineTxInfo, ctx context.Context, name string) (int64, error)
- func (store *MemoryStorageEngine) GetEventTypes(tx StorageEngineTxInfo, ctx context.Context) ([]IdNamePair, error)
- func (store *MemoryStorageEngine) GetEventsForAggregate(tx StorageEngineTxInfo, ctx context.Context, aggregateId int64, ...) ([]SerializedEvent, error)
- func (store *MemoryStorageEngine) GetEventsForSubscription(tx StorageEngineTxInfo, ctx context.Context, sub *Subscription, limit int) ([]SerializedEvent, error)
- func (store *MemoryStorageEngine) GetFirstEventIdFromTimestamp(tx StorageEngineTxInfo, ctx context.Context, ts time.Time) (int64, error)
- func (store *MemoryStorageEngine) GetMaxEventId(tx StorageEngineTxInfo, ctx context.Context) (int64, error)
- func (stor *MemoryStorageEngine) GetMaxKeyLength() int
- func (store *MemoryStorageEngine) GetOrCreateAggregateByKey(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, ...) (bool, int64, error)
- func (store *MemoryStorageEngine) GetSnapshotForAggregate(tx StorageEngineTxInfo, ctx context.Context, aggregateId int64) (*Snapshot, error)
- func (store *MemoryStorageEngine) GetSubscriptionByName(tx StorageEngineTxInfo, ctx context.Context, name string) (*Subscription, error)
- func (stor *MemoryStorageEngine) GetTransactionInfo() (StorageEngineTxInfo, error)
- func (store *MemoryStorageEngine) NewAggregate(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64) (int64, error)
- func (store *MemoryStorageEngine) NewAggregateWithKey(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, ...) (int64, error)
- func (store *MemoryStorageEngine) ReleaseSubscription(tx StorageEngineTxInfo, ctx context.Context, name string, owner string) error
- func (store *MemoryStorageEngine) RenewSubscription(tx StorageEngineTxInfo, ctx context.Context, name string, owner string, ...) (bool, error)
- func (store *MemoryStorageEngine) SetSubscriptionActive(tx StorageEngineTxInfo, ctx context.Context, id int64, active bool) error
- func (store *MemoryStorageEngine) UpsertSubscription(tx StorageEngineTxInfo, ctx context.Context, name string, ...) (int64, error)
- func (store *MemoryStorageEngine) WriteState(tx StorageEngineTxInfo, ctx context.Context, events []StorageEngineEvent, ...) error
- type MemoryStorageEngineTransaction
- type NameIdMap
- type Options
- type SerializedEvent
- type Snapshot
- type SnapshotSlice
- type StartFrom
- type StateAggregate
- func (t *StateAggregate[T]) ApplyEventState(eventState EventState, eventTime time.Time, reference string) error
- func (t *StateAggregate[T]) ApplySnapshot(snapshot *Snapshot) error
- func (t *StateAggregate[T]) DecodeEvent(ev SerializedEvent) (EventState, error)
- func (t *StateAggregate[T]) GetAggregateType() string
- func (t *StateAggregate[T]) GetId() int64
- func (t *StateAggregate[T]) GetSequence() int64
- func (t *StateAggregate[T]) GetSnapshotFrequency() int64
- func (t *StateAggregate[T]) GetSnapshotState() (*string, error)
- func (t *StateAggregate[T]) SetId(id int64)
- func (t *StateAggregate[T]) SetSequence(seq int64)
- type StateEvent
- type StorageEngine
- type StorageEngineError
- type StorageEngineEvent
- type StorageEngineTxInfo
- type Subscription
- type SubscriptionFilter
Constants ¶
const ( StartBeginning = "beginning" StartEnd = "end" StartEventID = "event_id" StartTimestamp = "timestamp" )
StartFromKind identifies how a subscription initializes its cursor.
const ( ErrorNotFound = "not_found" ErrorTypeConstraintViolation = "constraint_violation" )
Common storage engine error types
Variables ¶
var ErrEventStateIsNotAStateEvent = ErrorMessage("Event state is not a StateEvent.")
var ErrSubscriptionAlreadyOwned = errors.New("subscription already owned")
ErrSubscriptionAlreadyOwned indicates the subscription could not be claimed because another owner currently holds the lease.
var ErrorKeyExceedsMaximumLength error = fmt.Errorf("The specified key exceeds the maximum key length")
Functions ¶
func CopyFields ¶
CopyFields uses reflection to copy matching fields from e to t. If a pointer field in e is nil, the field is skipped.
func DecodeEventStateTo ¶
func DecodeEventStateTo[U any](e SerializedEvent, state *U) error
Decodes the event state into the specified type.
func DeserializeFromJson ¶
func InContext ¶
func InContext[T any](ctx context.Context, store *EventStore, exec contextFuncReturns[T]) (T, error)
Executes a function callback that returns a value after the context complets. This is the similar to the EventStore.Incontext method except it allows a return value after completion.
func InReadonlyContext ¶
func InReadonlyContext[T any](ctx context.Context, store *EventStore, exec readonlyContextFuncReturns[T]) (T, error)
Executes a function within a readonly context returning a value.
func RegisterEventDecoder ¶ added in v0.0.27
func RegisterEventDecoder(decoders ...EventDecoder)
RegisterEventDecoder sets the EventDecoder for StateAggregate - the function should be able to decode all events for any aggregate using StateAggregate.
func SerializeToJson ¶
func TypeNameFromType ¶
Types ¶
type Aggregate ¶
type Aggregate interface {
GetId() int64
SetId(int64)
GetSequence() int64
SetSequence(int64)
GetAggregateType() string
GetSnapshotFrequency() int64
GetSnapshotState() (*string, error)
ApplyEventState(eventState EventState, eventTime time.Time, reference string) error
ApplySnapshot(snapshot *Snapshot) error
}
Optinionated version of aggregate
type AggregateState ¶
type AggregateState struct {
AggregateId int64
NaturalKey *string
Snapshot *Snapshot
Events EventSlice
}
Represents the state of the aggregate including the latest snapshot (if it exists) and the events required to reconstitute its current state.
type ApplyFunc ¶ added in v0.0.27
type ApplyFunc func(eventState EventState, eventTime time.Time, reference string) error
type ContextOwner ¶
type ContextOwner interface {
// contains filtered or unexported methods
}
Methods for the EventStoreContext uses.
type ErrorMessage ¶ added in v0.0.27
type ErrorMessage string
func (ErrorMessage) Error ¶ added in v0.0.27
func (e ErrorMessage) Error() string
type EventDecoder ¶
type EventDecoder func(ev SerializedEvent) (EventState, error)
EventDecoder is a function that decodes an event into an EventState.
type EventState ¶
EventState represents the state of an event.
func DecodeEvent ¶ added in v0.0.27
func DecodeEvent(ev SerializedEvent) (bool, EventState, error)
DecodeEventStateTo decodes an event into an EventState.
type EventStore ¶
type EventStore struct {
// contains filtered or unexported fields
}
EventStore is the main entry point for interacting with the event store.
func NewEventStore ¶
func NewEventStore(storageEngine StorageEngine) *EventStore
func (*EventStore) Close ¶ added in v0.0.30
func (store *EventStore) Close() error
Closes the storage engine.
func (*EventStore) RunEphemeralSubscription ¶ added in v0.0.42
func (store *EventStore) RunEphemeralSubscription( ctx context.Context, filter SubscriptionFilter, start StartFrom, opts Options, handler func(context.Context, []SerializedEvent) error, ) error
RunEphemeralSubscription streams events without creating any persisted subscription metadata or cursor. The position is tracked in-process only and is lost when the caller exits. Useful for local caches, projections that can rebuild, etc.
Supports:
- AggregateType (optional)
- AggregateKey (optional)
- EventTypes: if multiple are provided, filtering is applied in-memory. For a single event type, filtering happens in the storage engine query.
func (*EventStore) RunSubscription ¶ added in v0.0.42
func (store *EventStore) RunSubscription( ctx context.Context, name string, filter SubscriptionFilter, start StartFrom, opts Options, handler func(context.Context, []SerializedEvent) error, ) error
RunSubscription ensures a durable subscription exists, claims a lease, and processes events using handler with at-least-once semantics.
func (*EventStore) SaveState ¶
func (store *EventStore) SaveState(ctx context.Context, tx StorageEngineTxInfo, events EventSlice, snapshots SnapshotSlice) error
This is used at the end of a context to save all the events and snapshots that happened during the context.
func (*EventStore) Warmup ¶
func (store *EventStore) Warmup(ctx context.Context, knownAggregateTypes []string, knownEventTypes []string) error
Warmup pre-loads aggregate and event types into memory to avoid database calls during contexts. This is intended to be called during application startup.
Parameters:
- knownAggregateTypes: List of aggregate type names to pre-load
- knownEventTypes: List of event type names to pre-load
This memoizes the type IDs in an internal map, saving database calls during normal operation. Should be called as early as possible in application startup.
func (*EventStore) WithContext ¶
func (store *EventStore) WithContext(ctx context.Context, exec contextFunc) error
Executes a function inside a context This is used to ensure the all events and snapshots are written after the context completes or that they are not written when an error happens.
func (*EventStore) WithReadonlyContext ¶
func (store *EventStore) WithReadonlyContext(ctx context.Context, exec readonlyContextFunc) error
Executes a callback function within a readonly context
type EventStoreContext ¶
type EventStoreContext interface {
EventStoreReadonlyContext
NewAggregateId(aggregateType string) (int64, error)
NewAggregateIdWithKey(aggregateType string, naturalKey string) (int64, error)
LoadOrCreateAggregate(agg Aggregate, naturalKey string) (bool, error)
CreateAggregateInto(agg Aggregate) error
CreateAggregateWithKeyInto(agg Aggregate, naturalKey string) error
ChangeAggregateNaturalKey(id int64, naturalKey string) error
ApplyEventTo(agg Aggregate, event EventState, time time.Time, reference string) error
Publish(*SerializedEvent)
SaveSnapshot(snapshot Snapshot)
}
Represents a read/write context access to the event store.
type EventStoreContextType ¶
type EventStoreContextType struct {
Transaction StorageEngineTxInfo
// contains filtered or unexported fields
}
Tracks information related to the current event store context.
func (*EventStoreContextType) ApplyEventTo ¶
func (etx *EventStoreContextType) ApplyEventTo( agg Aggregate, eventState EventState, time time.Time, reference string) error
Applies an event to the aggregate.
func (*EventStoreContextType) ChangeAggregateNaturalKey ¶ added in v0.0.35
func (etx *EventStoreContextType) ChangeAggregateNaturalKey( id int64, naturalKey string) error
Changes the natural key of an aggregate.
func (*EventStoreContextType) CreateAggregateInto ¶
func (etx *EventStoreContextType) CreateAggregateInto(agg Aggregate) error
Creates a new aggregate of the specified type.
func (*EventStoreContextType) CreateAggregateWithKeyInto ¶
func (etx *EventStoreContextType) CreateAggregateWithKeyInto( agg Aggregate, naturalKey string) error
Creates a new aggregate of the specified type with the specified natural key.
func (*EventStoreContextType) LoadAggregateState ¶
func (ctx *EventStoreContextType) LoadAggregateState( aggregateType string, aggregateId int64) (*AggregateState, error)
Loads the most recent snapshot and events from the event store
func (*EventStoreContextType) LoadAggregateStateByKey ¶
func (ctx *EventStoreContextType) LoadAggregateStateByKey( aggregateType string, naturalKey string) (*AggregateState, error)
Loads the aggregate state using the natural key.
func (*EventStoreContextType) LoadOrCreateAggregate ¶
func (etx *EventStoreContextType) LoadOrCreateAggregate( agg Aggregate, naturalKey string) (bool, error)
LoadOrCreateAggregate loads an existing aggregate by natural key if it exists, otherwise creates a new one. Returns true if the aggregate was created, false if loaded.
func (*EventStoreContextType) LoadStateByKeyInto ¶
func (etx *EventStoreContextType) LoadStateByKeyInto( agg Aggregate, naturalKey string) error
Loads the state of an aggregate into the aggregate using the natural key.
func (*EventStoreContextType) LoadStateInto ¶
func (etx *EventStoreContextType) LoadStateInto( agg Aggregate, aggregateId int64) error
Loads the state of an aggregate into the aggregate.
func (EventStoreContextType) NewAggregateId ¶
func (ctx EventStoreContextType) NewAggregateId(aggregateType string) (int64, error)
Adds a new aggregate stream and returns the resulting id.
func (EventStoreContextType) NewAggregateIdWithKey ¶
func (ctx EventStoreContextType) NewAggregateIdWithKey( aggregateType string, naturalKey string) (int64, error)
Adds a new aggregate stream with the specified natural key and returns the resulting id.
func (*EventStoreContextType) Publish ¶
func (ctx *EventStoreContextType) Publish(event *SerializedEvent)
Publishes an event for storage once the context completes.
func (*EventStoreContextType) SaveSnapshot ¶
func (etx *EventStoreContextType) SaveSnapshot(snapshot Snapshot)
Stages a snapshot to save once the context completes.
type EventStoreReadonlyContext ¶
type EventStoreReadonlyContext interface {
LoadStateInto(aggregate Aggregate, id int64) error
LoadStateByKeyInto(aggregate Aggregate, naturalKey string) error
LoadAggregateState(aggregateType string, aggregateId int64) (*AggregateState, error)
LoadAggregateStateByKey(aggregateType string, naturalKey string) (*AggregateState, error)
}
Represents a readonly context access to the event store.
type IdNameMap ¶
Represents a map of id to name.
func MapIdToName ¶
func MapIdToName(idNamePair []IdNamePair) IdNameMap
Maps a slice of IdNamePair to a map of id to name.
type MemoryStorageEngine ¶
type MemoryStorageEngine struct {
CapturedEvents []StorageEngineEvent
CapturedSnapshots []Snapshot
AggregateTypes map[string]int64
AggregateTypesInv map[int64]string
EventTypes map[string]int64
EventTypesInv map[int64]string
CountAggregateType int64
CountEventTypes int64
CountAggregates int64
AggregateToTypeId map[int64]int64
Aggregates map[string]int64
AggregateInv map[int64]*string
// Subscriptions (in-memory only)
Subscriptions map[string]*Subscription
SubscriptionByID map[int64]*Subscription
SubscriptionEventTypes map[int64]map[int64]bool // subscriptionID -> set(eventTypeID)
NextSubscriptionID int64
}
Ephemeral memory storage engine useful for testing without a database.
func NewMemoryStorageEngine ¶
func NewMemoryStorageEngine() *MemoryStorageEngine
NewMemoryStorageEngine creates a new in-memory storage engine.
func (*MemoryStorageEngine) AddSubscriptionEventType ¶ added in v0.0.42
func (store *MemoryStorageEngine) AddSubscriptionEventType(tx StorageEngineTxInfo, ctx context.Context, subscriptionId int64, eventTypeId int64) error
func (*MemoryStorageEngine) AdvanceSubscriptionCursor ¶ added in v0.0.42
func (store *MemoryStorageEngine) AdvanceSubscriptionCursor(tx StorageEngineTxInfo, ctx context.Context, id int64, lastEventId int64) error
func (*MemoryStorageEngine) ChangeAggregateNaturalKey ¶ added in v0.0.33
func (store *MemoryStorageEngine) ChangeAggregateNaturalKey(tx StorageEngineTxInfo, ctx context.Context, aggregateId int64, naturalKey string) error
func (*MemoryStorageEngine) ClaimSubscription ¶ added in v0.0.42
func (store *MemoryStorageEngine) ClaimSubscription(tx StorageEngineTxInfo, ctx context.Context, name string, owner string, lease time.Duration) (bool, error)
func (*MemoryStorageEngine) Close ¶ added in v0.0.30
func (store *MemoryStorageEngine) Close() error
func (*MemoryStorageEngine) GetAggregateById ¶
func (store *MemoryStorageEngine) GetAggregateById(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, aggregateId int64) (int64, *string, error)
func (*MemoryStorageEngine) GetAggregateByKey ¶
func (store *MemoryStorageEngine) GetAggregateByKey(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, naturalKey string) (int64, error)
func (*MemoryStorageEngine) GetAggregateTypeId ¶
func (store *MemoryStorageEngine) GetAggregateTypeId(tx StorageEngineTxInfo, ctx context.Context, name string) (int64, error)
func (*MemoryStorageEngine) GetAggregateTypes ¶
func (store *MemoryStorageEngine) GetAggregateTypes(tx StorageEngineTxInfo, ctx context.Context) ([]IdNamePair, error)
func (*MemoryStorageEngine) GetEventTypeId ¶
func (store *MemoryStorageEngine) GetEventTypeId(tx StorageEngineTxInfo, ctx context.Context, name string) (int64, error)
func (*MemoryStorageEngine) GetEventTypes ¶
func (store *MemoryStorageEngine) GetEventTypes(tx StorageEngineTxInfo, ctx context.Context) ([]IdNamePair, error)
func (*MemoryStorageEngine) GetEventsForAggregate ¶
func (store *MemoryStorageEngine) GetEventsForAggregate(tx StorageEngineTxInfo, ctx context.Context, aggregateId int64, afterSequence int64) ([]SerializedEvent, error)
func (*MemoryStorageEngine) GetEventsForSubscription ¶ added in v0.0.42
func (store *MemoryStorageEngine) GetEventsForSubscription(tx StorageEngineTxInfo, ctx context.Context, sub *Subscription, limit int) ([]SerializedEvent, error)
func (*MemoryStorageEngine) GetFirstEventIdFromTimestamp ¶ added in v0.0.42
func (store *MemoryStorageEngine) GetFirstEventIdFromTimestamp(tx StorageEngineTxInfo, ctx context.Context, ts time.Time) (int64, error)
func (*MemoryStorageEngine) GetMaxEventId ¶ added in v0.0.42
func (store *MemoryStorageEngine) GetMaxEventId(tx StorageEngineTxInfo, ctx context.Context) (int64, error)
func (*MemoryStorageEngine) GetMaxKeyLength ¶
func (stor *MemoryStorageEngine) GetMaxKeyLength() int
func (*MemoryStorageEngine) GetOrCreateAggregateByKey ¶
func (store *MemoryStorageEngine) GetOrCreateAggregateByKey(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, naturalKey string) (bool, int64, error)
func (*MemoryStorageEngine) GetSnapshotForAggregate ¶
func (store *MemoryStorageEngine) GetSnapshotForAggregate(tx StorageEngineTxInfo, ctx context.Context, aggregateId int64) (*Snapshot, error)
func (*MemoryStorageEngine) GetSubscriptionByName ¶ added in v0.0.42
func (store *MemoryStorageEngine) GetSubscriptionByName(tx StorageEngineTxInfo, ctx context.Context, name string) (*Subscription, error)
func (*MemoryStorageEngine) GetTransactionInfo ¶
func (stor *MemoryStorageEngine) GetTransactionInfo() (StorageEngineTxInfo, error)
func (*MemoryStorageEngine) NewAggregate ¶
func (store *MemoryStorageEngine) NewAggregate(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64) (int64, error)
func (*MemoryStorageEngine) NewAggregateWithKey ¶
func (store *MemoryStorageEngine) NewAggregateWithKey(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, naturalKey string) (int64, error)
func (*MemoryStorageEngine) ReleaseSubscription ¶ added in v0.0.42
func (store *MemoryStorageEngine) ReleaseSubscription(tx StorageEngineTxInfo, ctx context.Context, name string, owner string) error
func (*MemoryStorageEngine) RenewSubscription ¶ added in v0.0.42
func (store *MemoryStorageEngine) RenewSubscription(tx StorageEngineTxInfo, ctx context.Context, name string, owner string, lease time.Duration) (bool, error)
func (*MemoryStorageEngine) SetSubscriptionActive ¶ added in v0.0.42
func (store *MemoryStorageEngine) SetSubscriptionActive(tx StorageEngineTxInfo, ctx context.Context, id int64, active bool) error
func (*MemoryStorageEngine) UpsertSubscription ¶ added in v0.0.42
func (*MemoryStorageEngine) WriteState ¶
func (store *MemoryStorageEngine) WriteState(tx StorageEngineTxInfo, ctx context.Context, events []StorageEngineEvent, snapshot SnapshotSlice) error
type MemoryStorageEngineTransaction ¶
type MemoryStorageEngineTransaction struct {
}
MemoryStorageEngineTransaction is a transaction for the in-memory storage engine.
func (MemoryStorageEngineTransaction) Commit ¶
func (tx MemoryStorageEngineTransaction) Commit() error
func (MemoryStorageEngineTransaction) Rollback ¶
func (tx MemoryStorageEngineTransaction) Rollback() error
type NameIdMap ¶
Represents a map of name to id.
func MapNameToId ¶
func MapNameToId(idNamePair []IdNamePair) NameIdMap
Maps a slice of IdNamePair to a map of name to id.
type Options ¶ added in v0.0.42
type Options struct {
BatchSize int
PollInterval time.Duration
Lease time.Duration
Owner string // optional; autogenerated if empty
}
Options controls runtime behavior of a subscription runner.
type SerializedEvent ¶
type SerializedEvent struct {
// EventID is the global, monotonically increasing id from the events table.
// It may be zero when events are loaded by-aggregate (not via event_log).
EventID int64
AggregateId int64
EventType string
State string
Sequence int64
Reference string
EventTime time.Time
}
Represents an event to be published with the state serialized.
type StateAggregate ¶
type StateAggregate[T any] struct { Id int64 State T Sequence int64 // contains filtered or unexported fields }
StateAggregate can be used for aggregates that contain simple state. It provides common aggregate functionality and handles state management.
HandleOtherEvents is an optional function that can be set to handle events that don't implement iStateEvent. When an event is applied that isn't a StateEvent, this function will be called if set. If not set, such events will return ErrEventStateIsNotAStateEvent.
Example usage:
aggregate.HandleOtherEvents = func(eventState EventState, eventTime time.Time, reference string) error {
switch event := eventState.(type) {
case SetActiveEvent:
aggregate.State.Active = event.Active
return nil
default:
return fmt.Errorf("unknown event type %s", event.GetEventType())
}
}
func (*StateAggregate[T]) ApplyEventState ¶
func (t *StateAggregate[T]) ApplyEventState(eventState EventState, eventTime time.Time, reference string) error
ApplyEventState applies an event state to the aggregate
func (*StateAggregate[T]) ApplySnapshot ¶
func (t *StateAggregate[T]) ApplySnapshot(snapshot *Snapshot) error
ApplySnapshot applies a snapshot to the aggregate
func (*StateAggregate[T]) DecodeEvent ¶
func (t *StateAggregate[T]) DecodeEvent(ev SerializedEvent) (EventState, error)
func (*StateAggregate[T]) GetAggregateType ¶
func (t *StateAggregate[T]) GetAggregateType() string
GetAggregateType gets the aggregate type
func (*StateAggregate[T]) GetId ¶
func (t *StateAggregate[T]) GetId() int64
GetId gets the aggregate id
func (*StateAggregate[T]) GetSequence ¶
func (t *StateAggregate[T]) GetSequence() int64
Implement Aggregate interface for StateAggregate
func (*StateAggregate[T]) GetSnapshotFrequency ¶
func (t *StateAggregate[T]) GetSnapshotFrequency() int64
GetSnapshotFrequency gets the snapshot frequency
func (*StateAggregate[T]) GetSnapshotState ¶
func (t *StateAggregate[T]) GetSnapshotState() (*string, error)
GetSnapshotState gets the snapshot state
func (*StateAggregate[T]) SetId ¶
func (t *StateAggregate[T]) SetId(id int64)
SetId sets the aggregate id
func (*StateAggregate[T]) SetSequence ¶
func (t *StateAggregate[T]) SetSequence(seq int64)
SetSequence sets the aggregate sequence
type StateEvent ¶
type StateEvent[T any] struct { State T }
func NewStateEvent ¶
func NewStateEvent[T any](state T) StateEvent[T]
func (*StateEvent[T]) Deserialize ¶
func (ev *StateEvent[T]) Deserialize(serialized string) error
func (StateEvent[T]) GetEventType ¶
func (ev StateEvent[T]) GetEventType() string
func (StateEvent[T]) GetState ¶
func (ev StateEvent[T]) GetState() any
func (StateEvent[T]) Serialize ¶
func (ev StateEvent[T]) Serialize() string
type StorageEngine ¶
type StorageEngine interface {
// Gets the maximum length of a natural key
GetMaxKeyLength() int
// Gets the id for the event type string. It is added if it does not already exist.
GetEventTypeId(tx StorageEngineTxInfo, ctx context.Context, typeName string) (int64, error)
// Gets the id for the aggregate type string. It is added if it does not already exist.
GetAggregateTypeId(tx StorageEngineTxInfo, ctx context.Context, typeName string) (int64, error)
// Adds a new aggregate of the specified type to the store and returns the id.
NewAggregate(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64) (int64, error)
// Adds a new aggregate of the specified type with the natural key to the store and returns the id.
NewAggregateWithKey(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, natrualKey string) (int64, error)
// Gets aggregate id and corresponding natural key that corresponds to the type and id.
GetAggregateById(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, aggregateId int64) (int64, *string, error)
// Gets the aggregate id corresponding to the type and natural key.
GetAggregateByKey(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, naturalKey string) (int64, error)
// Gets or creates an aggregate id for the given type and natural key.
// If an aggregate with the given natural key already exists, returns false and its id.
// Otherwise creates a new aggregate with the key and returns true and its new id.
// Returns error if:
// - The natural key exceeds maximum length (ErrorKeyExceedsMaximumLength)
// - The aggregate type doesn't exist
// - There are database errors
GetOrCreateAggregateByKey(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, naturalKey string) (bool, int64, error)
// Changes the natural key of an aggregate.
ChangeAggregateNaturalKey(tx StorageEngineTxInfo, ctx context.Context, aggregateId int64, naturalKey string) error
// Loads all the aggregate types.
GetAggregateTypes(tx StorageEngineTxInfo, ctx context.Context) ([]IdNamePair, error)
// Loads all the event types.
GetEventTypes(tx StorageEngineTxInfo, ctx context.Context) ([]IdNamePair, error)
// Gets a snapshot (if any) for the specified aggregate.
GetSnapshotForAggregate(tx StorageEngineTxInfo, ctx context.Context, aggregateId int64) (*Snapshot, error)
// Retrieve events for an aggregate which are after a sequence.
GetEventsForAggregate(tx StorageEngineTxInfo, ctx context.Context, aggregateId int64, afterSequence int64) ([]SerializedEvent, error)
// Writes events to storage.
WriteState(tx StorageEngineTxInfo, ctx context.Context, events []StorageEngineEvent, snapshot SnapshotSlice) error
// Gets a transaction state to track multiple
GetTransactionInfo() (StorageEngineTxInfo, error)
// Closes the storage engine.
Close() error
// -------- Durable Subscriptions --------
// Create or update a subscription by name. If updating, last_event_id is not modified.
UpsertSubscription(tx StorageEngineTxInfo, ctx context.Context, name string, aggregateTypeId *int64, eventTypeId *int64, aggregateKey *string, startFrom string, startEventId int64, startTimestamp *time.Time) (int64, error)
// Optionally associate multiple event types to a subscription (enables multi-type filtering).
AddSubscriptionEventType(tx StorageEngineTxInfo, ctx context.Context, subscriptionId int64, eventTypeId int64) error
// Fetch a subscription by name.
GetSubscriptionByName(tx StorageEngineTxInfo, ctx context.Context, name string) (*Subscription, error)
// Activate/deactivate a subscription.
SetSubscriptionActive(tx StorageEngineTxInfo, ctx context.Context, id int64, active bool) error
// Cooperative lease management for distributed runners.
ClaimSubscription(tx StorageEngineTxInfo, ctx context.Context, name string, owner string, lease time.Duration) (bool, error)
RenewSubscription(tx StorageEngineTxInfo, ctx context.Context, name string, owner string, lease time.Duration) (bool, error)
ReleaseSubscription(tx StorageEngineTxInfo, ctx context.Context, name string, owner string) error
// Event streaming for a subscription and cursor advancement.
GetEventsForSubscription(tx StorageEngineTxInfo, ctx context.Context, sub *Subscription, limit int) ([]SerializedEvent, error)
AdvanceSubscriptionCursor(tx StorageEngineTxInfo, ctx context.Context, id int64, lastEventId int64) error
// Helpers for initializing cursors.
GetMaxEventId(tx StorageEngineTxInfo, ctx context.Context) (int64, error)
GetFirstEventIdFromTimestamp(tx StorageEngineTxInfo, ctx context.Context, ts time.Time) (int64, error)
}
A storage engine contains the raw methods for interacting with the event store database.
The storage engine implementations should implemented to be safe for calling from multiple goroutines simultaneously.
type StorageEngineError ¶
type StorageEngineError struct {
// The underlying error that triggered this error
Err error
// A human-readable message describing the error
Message string
// The type of error that occurred
ErrorType string
}
StorageEngineError represents an error from the storage engine.
func NewStorageEngineError ¶
func NewStorageEngineError(message string, err error) *StorageEngineError
NewStorageEngineError creates a new StorageEngineError.
func (*StorageEngineError) Error ¶
func (e *StorageEngineError) Error() string
Error returns the string representation of the error.
func (*StorageEngineError) Unwrap ¶
func (e *StorageEngineError) Unwrap() error
Unwrap returns the underlying error.
type StorageEngineEvent ¶
type StorageEngineEvent struct {
AggregateID int64
Sequence int64
EventTypeID int64
State string
EventTime time.Time
Reference string
}
Represents an event mapping to the storage engine.
type StorageEngineTxInfo ¶
Interface represents transaction info needed to track the transaction.
type Subscription ¶ added in v0.0.42
type Subscription struct {
ID int64
Name string
AggregateTypeID *int64
EventTypeID *int64
AggregateKey *string
StartFrom string
StartEventID int64
StartTimestamp *time.Time
LastEventID int64
Active bool
LeaseOwner *string
LeaseExpiresAt *time.Time
}
Subscription describes a durable reader over the event log with optional filters and a durable cursor stored as last_event_id.
type SubscriptionFilter ¶ added in v0.0.42
type SubscriptionFilter struct {
AggregateType string // optional; empty means all aggregates
EventTypes []string // optional; empty means all event types
AggregateKey *string // optional
}
SubscriptionFilter controls which events are read by a subscription.