Documentation
¶
Overview ¶
Package eventstore provides core abstractions and types for event sourcing with Dynamic Event Streams.
This package defines the fundamental interfaces and types used across different event store implementations, including filters, storable events, and common error definitions.
The event store supports dynamic filtering of events based on:
- Event types
- JSON payload predicates
- Time ranges (occurred from/until)
Key types:
- Filter: Defines criteria for querying events
- StorableEvent: Represents an event that can be stored and retrieved
- StorableEvents: Collection of storable events
Common usage pattern:
// Create a filter for multiple event types with a predicate
filter := BuildEventFilter().
Matching().
AnyEventTypeOf(
core.BookCopyLentToReaderEventType,
core.BookCopyReturnedByReaderEventType).
AndAnyPredicateOf(P("BookID", bookID.String())).
Finalize()
events, maxSeq, err := store.Query(ctx, filter)
if err != nil {
// handle error
}
newEvent, err := eventstore.BuildStorableEvent(eventType, time.Now(), payload, metadata)
if err != nil {
// handle error
}
err = store.Append(ctx, filter, maxSeq, newEvent)
Index ¶
- Variables
- type CompletedFilterItemBuilder
- type CompletedFilterItemBuilderWithOccurredFrom
- type CompletedFilterItemBuilderWithOccurredFromToUntil
- type CompletedFilterItemBuilderWithOccurredUntil
- type EmptyFilterItemBuilder
- type Filter
- type FilterBuilder
- type FilterEventTypeString
- type FilterItem
- type FilterItemBuilderLackingEventTypes
- type FilterItemBuilderLackingPredicates
- type FilterKeyString
- type FilterPredicate
- type FilterValString
- type MaxSequenceNumberUint
- type StorableEvent
- type StorableEvents
Constants ¶
This section is empty.
Variables ¶
var ( ErrEmptyEventsTableName = errors.New("events table name must not be empty") ErrConcurrencyConflict = errors.New("concurrency error, no rows were affected") ErrNilDatabaseConnection = errors.New("database connection must not be nil") ErrQueryingEventsFailed = errors.New("querying events failed") ErrScanningDBRowFailed = errors.New("scanning db row failed") ErrBuildingStorableEventFailed = errors.New("building storable event failed") ErrAppendingEventFailed = errors.New("appending the event failed") ErrGettingRowsAffectedFailed = errors.New("getting rows affected failed") ErrBuildingQueryFailed = errors.New("building the query failed") )
var ErrInvalidMetadataJSON = errors.New("metadata json is not valid")
var ErrInvalidPayloadJSON = errors.New("payload json is not valid")
Functions ¶
This section is empty.
Types ¶
type CompletedFilterItemBuilder ¶
type CompletedFilterItemBuilder interface {
// OccurredFrom sets the lower boundary for occurredAt (including this timestamp) for the whole Filter.
OccurredFrom(occurredAtFrom time.Time) CompletedFilterItemBuilderWithOccurredFrom
// OccurredUntil sets the upper boundary for occurredAt (including this timestamp) for the whole Filter.
//
// Currently, there is NO check if OccurredUntil is later than OccurredFrom!
OccurredUntil(occurredAtUntil time.Time) CompletedFilterItemBuilderWithOccurredUntil
// OrMatching finalizes the current FilterItem and starts a new one.
OrMatching() EmptyFilterItemBuilder
// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
Finalize() Filter
}
type CompletedFilterItemBuilderWithOccurredFrom ¶
type CompletedFilterItemBuilderWithOccurredFrom interface {
// AndOccurredUntil sets the upper boundary for occurredAt (including this timestamp) for the whole Filter.
//
// Currently, there is NO check if AndOccurredUntil is later than OccurredFrom!
AndOccurredUntil(occurredAtUntil time.Time) CompletedFilterItemBuilderWithOccurredFromToUntil
// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
Finalize() Filter
}
type CompletedFilterItemBuilderWithOccurredFromToUntil ¶
type CompletedFilterItemBuilderWithOccurredFromToUntil interface {
// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
Finalize() Filter
}
type CompletedFilterItemBuilderWithOccurredUntil ¶
type CompletedFilterItemBuilderWithOccurredUntil interface {
// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
Finalize() Filter
}
type EmptyFilterItemBuilder ¶
type EmptyFilterItemBuilder interface {
// AnyEventTypeOf adds one or multiple EventTypes to the current FilterItem.
//
// It sanitizes the input:
// - removing empty EventTypes ("")
// - sorting the EventTypes
// - removing duplicate EventTypes
AnyEventTypeOf(eventType FilterEventTypeString, eventTypes ...FilterEventTypeString) FilterItemBuilderLackingPredicates
// AnyPredicateOf adds one or multiple FilterPredicate(s) to the current FilterItem.
//
// It sanitizes the input:
// - removing empty/partial FilterPredicate(s) (key or val is "")
// - sorting the FilterPredicate(s)
// - removing duplicate FilterPredicate(s)
AnyPredicateOf(predicate FilterPredicate, predicates ...FilterPredicate) FilterItemBuilderLackingEventTypes
// AllPredicatesOf adds one or multiple FilterPredicate(s) to the current FilterItem expecting ALL predicates to match.
//
// It sanitizes the input:
// - removing empty/partial FilterPredicate(s) (key or val is "")
// - sorting the FilterPredicate(s)
// - removing duplicate FilterPredicate(s)
AllPredicatesOf(predicate FilterPredicate, predicates ...FilterPredicate) FilterItemBuilderLackingEventTypes
}
type Filter ¶
type Filter struct {
// contains filtered or unexported fields
}
func (Filter) Items ¶
func (f Filter) Items() []FilterItem
func (Filter) OccurredFrom ¶
func (Filter) OccurredUntil ¶
type FilterBuilder ¶
type FilterBuilder interface {
// Matching starts a new FilterItem.
Matching() EmptyFilterItemBuilder
// MatchingAnyEvent directly creates an empty Filter.
// WARNING: This returns ALL events and should not be used in production.
MatchingAnyEvent() Filter
// OccurredFrom sets the lower boundary for occurredAt (including this timestamp) for the whole Filter.
OccurredFrom(occurredAtFrom time.Time) CompletedFilterItemBuilderWithOccurredFrom
// OccurredUntil sets the upper boundary for occurredAt (including this timestamp) for the whole Filter.
//
// Currently, there is NO check if OccurredUntil is later than OccurredFrom!
OccurredUntil(occurredAtUntil time.Time) CompletedFilterItemBuilderWithOccurredUntil
}
FilterBuilder builds generic event filters for database-specific query implementations. It enforces useful filter combinations for event-sourced workflows, supporting event types, JSON payload predicates, and time ranges. Complex combinations are supported through multiple FilterItems with OR logic between items and configurable AND/OR logic within items.
func BuildEventFilter ¶
func BuildEventFilter() FilterBuilder
BuildEventFilter creates a FilterBuilder which must eventually be finalized with Finalize() or MatchingAnyEvent(). Note: MatchingAnyEvent() returns ALL events and should not be used in production.
type FilterEventTypeString ¶
type FilterEventTypeString = string
type FilterItem ¶
type FilterItem struct {
// contains filtered or unexported fields
}
func (FilterItem) AllPredicatesMustMatch ¶
func (fi FilterItem) AllPredicatesMustMatch() bool
func (FilterItem) EventTypes ¶
func (fi FilterItem) EventTypes() []FilterEventTypeString
func (FilterItem) Predicates ¶
func (fi FilterItem) Predicates() []FilterPredicate
type FilterItemBuilderLackingEventTypes ¶
type FilterItemBuilderLackingEventTypes interface {
// AndAnyEventTypeOf adds one or multiple EventTypes to the current FilterItem.
//
// It sanitizes the input:
// - removing empty EventTypes ("")
// - sorting the EventTypes
// - removing duplicate EventTypes
AndAnyEventTypeOf(eventType FilterEventTypeString, eventTypes ...FilterEventTypeString) CompletedFilterItemBuilder
// OccurredFrom sets the lower boundary for occurredAt (including this timestamp) for the whole Filter.
OccurredFrom(occurredAtFrom time.Time) CompletedFilterItemBuilderWithOccurredFrom
// OccurredUntil sets the upper boundary for occurredAt (including this timestamp) for the whole Filter.
//
// Currently, there is NO check if OccurredUntil is later than OccurredFrom!
OccurredUntil(occurredAtUntil time.Time) CompletedFilterItemBuilderWithOccurredUntil
// OrMatching finalizes the current FilterItem and starts a new one.
OrMatching() EmptyFilterItemBuilder
// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
Finalize() Filter
}
type FilterItemBuilderLackingPredicates ¶
type FilterItemBuilderLackingPredicates interface {
// AndAnyPredicateOf adds one or multiple FilterPredicate(s) to the current FilterItem expecting ANY predicate to match.
//
// It sanitizes the input:
// - removing empty/partial FilterPredicate(s) (key or val is "")
// - sorting the FilterPredicate(s)
// - removing duplicate FilterPredicate(s)
AndAnyPredicateOf(predicate FilterPredicate, predicates ...FilterPredicate) CompletedFilterItemBuilder
// AndAllPredicatesOf adds one or multiple FilterPredicate(s) to the current FilterItem expecting ALL predicates to match.
//
// It sanitizes the input:
// - removing empty/partial FilterPredicate(s) (key or val is "")
// - sorting the FilterPredicate(s)
// - removing duplicate FilterPredicate(s)
AndAllPredicatesOf(predicate FilterPredicate, predicates ...FilterPredicate) CompletedFilterItemBuilder
// OccurredFrom sets the lower boundary for occurredAt (including this timestamp) for the whole Filter.
OccurredFrom(occurredAtFrom time.Time) CompletedFilterItemBuilderWithOccurredFrom
// OccurredUntil sets the upper boundary for occurredAt (including this timestamp) for the whole Filter.
//
// Currently, there is NO check if OccurredUntil is later than OccurredFrom!
OccurredUntil(occurredAtUntil time.Time) CompletedFilterItemBuilderWithOccurredUntil
// OrMatching finalizes the current FilterItem and starts a new one.
OrMatching() EmptyFilterItemBuilder
// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
Finalize() Filter
}
type FilterKeyString ¶
type FilterKeyString = string
type FilterPredicate ¶
type FilterPredicate struct {
// contains filtered or unexported fields
}
func P ¶
func P(key FilterKeyString, val FilterValString) FilterPredicate
func (FilterPredicate) Key ¶
func (fp FilterPredicate) Key() FilterKeyString
func (FilterPredicate) Val ¶
func (fp FilterPredicate) Val() FilterValString
type FilterValString ¶
type FilterValString = string
type MaxSequenceNumberUint ¶
type MaxSequenceNumberUint = uint
MaxSequenceNumberUint is a type alias for uint, representing the maximum sequence number for a "dynamic event stream".
type StorableEvent ¶
type StorableEvent struct {
EventType string
OccurredAt time.Time
PayloadJSON []byte
MetadataJSON []byte
}
StorableEvent is a DTO (data transfer object) used by the EventStore to append events and query them back.
It is built on scalars to be completely agnostic of the implementation of Domain Events in the client code.
While its properties are exported, it should only be constructed with the supplied factory methods:
- BuildStorableEvent
- BuildStorableEventWithEmptyMetadata
func BuildStorableEvent ¶
func BuildStorableEvent(eventType string, occurredAt time.Time, payloadJSON []byte, metadataJSON []byte) (StorableEvent, error)
BuildStorableEvent is a factory method for StorableEvent.
It populates the StorableEvent with the given scalar input. Returns an error if payloadJSON or metadataJSON are not valid JSON.
func BuildStorableEventWithEmptyMetadata ¶
func BuildStorableEventWithEmptyMetadata(eventType string, occurredAt time.Time, payloadJSON []byte) (StorableEvent, error)
BuildStorableEventWithEmptyMetadata is a factory method for StorableEvent.
It populates the StorableEvent with the given scalar input and creates valid empty JSON for MetadataJSON. Returns an error if payloadJSON is not valid JSON.
type StorableEvents ¶
type StorableEvents = []StorableEvent
StorableEvents is an alias type for a slice of StorableEvent
Directories
¶
| Path | Synopsis |
|---|---|
|
Package postgresengine provides a PostgreSQL implementation of the eventstore interface.
|
Package postgresengine provides a PostgreSQL implementation of the eventstore interface. |
|
internal/adapters
Package adapters provide database adapter implementations for the PostgreSQL event store.
|
Package adapters provide database adapter implementations for the PostgreSQL event store. |