Documentation
¶
Overview ¶
Package eventstore provides core abstractions and types for event sourcing with Dynamic Event Streams.
This package defines the fundamental interfaces and types used across different event store implementations, including filters, storable events, and common error definitions.
The event store supports dynamic filtering of events based on:
- Event types
- JSON payload predicates
- Time ranges (occurred from/until)
Key types:
- Filter: Defines criteria for querying events
- StorableEvent: Represents an event that can be stored and retrieved
- StorableEvents: Collection of storable events
Common usage pattern:
// Create a filter for multiple event types with a predicate
filter := BuildEventFilter().
Matching().
AnyEventTypeOf(
core.BookCopyLentToReaderEventType,
core.BookCopyReturnedByReaderEventType).
AndAnyPredicateOf(P("BookID", bookID.String())).
Finalize()
events, maxSeq, err := store.Query(ctx, filter)
if err != nil {
// handle error
}
newEvent, err := eventstore.BuildStorableEvent(eventType, time.Now(), payload, metadata)
if err != nil {
// handle error
}
err = store.Append(ctx, filter, maxSeq, newEvent)
Index ¶
- Variables
- type CompletedFilterItemBuilder
- type CompletedFilterItemBuilderWithOccurredFrom
- type CompletedFilterItemBuilderWithOccurredFromToUntil
- type CompletedFilterItemBuilderWithOccurredUntil
- type EmptyFilterItemBuilder
- type Filter
- type FilterBuilder
- type FilterEventTypeString
- type FilterItem
- type FilterItemBuilderLackingEventTypes
- type FilterItemBuilderLackingPredicates
- type FilterKeyString
- type FilterPredicate
- type FilterValString
- type MaxSequenceNumberUint
- type StorableEvent
- type StorableEvents
Constants ¶
This section is empty.
Variables ¶
var ( // ErrEmptyEventsTableName is returned when an empty table name is provided during event store configuration. ErrEmptyEventsTableName = errors.New("events table name must not be empty") // ErrConcurrencyConflict is returned when an optimistic concurrency check fails during event appending. ErrConcurrencyConflict = errors.New("concurrency error, no rows were affected") // ErrNilDatabaseConnection is returned when a nil database connection is provided to event store constructors. ErrNilDatabaseConnection = errors.New("database connection must not be nil") // ErrQueryingEventsFailed is returned when the underlying database query operation fails during event retrieval. ErrQueryingEventsFailed = errors.New("querying events failed") // ErrScanningDBRowFailed is returned when database row scanning fails during event retrieval. ErrScanningDBRowFailed = errors.New("scanning db row failed") // ErrBuildingStorableEventFailed is returned when construction of a StorableEvent from database data fails. ErrBuildingStorableEventFailed = errors.New("building storable event failed") // ErrAppendingEventFailed is returned when the underlying database execution fails during event appending. ErrAppendingEventFailed = errors.New("appending the event failed") // ErrGettingRowsAffectedFailed is returned when retrieving the affected row count after database execution fails. ErrGettingRowsAffectedFailed = errors.New("getting rows affected failed") // ErrBuildingQueryFailed is returned when SQL query construction fails during event store operations. ErrBuildingQueryFailed = errors.New("building the query failed") // ErrRowsIterationFailed is returned when iterating over database result rows fails during event retrieval. ErrRowsIterationFailed = errors.New("database rows iteration failed") )
var ( // ErrInvalidPayloadJSON is returned when payload JSON is malformed or invalid. ErrInvalidPayloadJSON = errors.New("payload json is not valid") // ErrInvalidMetadataJSON is returned when metadata JSON is malformed or invalid. ErrInvalidMetadataJSON = errors.New("metadata json is not valid") )
Functions ¶
This section is empty.
Types ¶
type CompletedFilterItemBuilder ¶
type CompletedFilterItemBuilder interface {
// OccurredFrom sets the lower boundary for occurredAt (including this timestamp) for the whole Filter.
OccurredFrom(occurredAtFrom time.Time) CompletedFilterItemBuilderWithOccurredFrom
// OccurredUntil sets the upper boundary for occurredAt (including this timestamp) for the whole Filter.
//
// Currently, there is NO check if OccurredUntil is later than OccurredFrom!
OccurredUntil(occurredAtUntil time.Time) CompletedFilterItemBuilderWithOccurredUntil
// OrMatching finalizes the current FilterItem and starts a new one.
OrMatching() EmptyFilterItemBuilder
// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
Finalize() Filter
}
CompletedFilterItemBuilder represents a FilterItem with both event types and predicates. You can set time boundaries, add more items, or finalize the filter.
type CompletedFilterItemBuilderWithOccurredFrom ¶
type CompletedFilterItemBuilderWithOccurredFrom interface {
// AndOccurredUntil sets the upper boundary for occurredAt (including this timestamp) for the whole Filter.
//
// Currently, there is NO check if AndOccurredUntil is later than OccurredFrom!
AndOccurredUntil(occurredAtUntil time.Time) CompletedFilterItemBuilderWithOccurredFromToUntil
// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
Finalize() Filter
}
CompletedFilterItemBuilderWithOccurredFrom represents a completed FilterItem with a lower time boundary set. You can optionally add an upper time boundary or finalize the filter.
type CompletedFilterItemBuilderWithOccurredFromToUntil ¶
type CompletedFilterItemBuilderWithOccurredFromToUntil interface {
// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
Finalize() Filter
}
CompletedFilterItemBuilderWithOccurredFromToUntil represents a completed FilterItem with both time boundaries set. You can only finalize the filter at this stage.
type CompletedFilterItemBuilderWithOccurredUntil ¶
type CompletedFilterItemBuilderWithOccurredUntil interface {
// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
Finalize() Filter
}
CompletedFilterItemBuilderWithOccurredUntil represents a completed FilterItem with an upper time boundary set. You can only finalize the filter at this stage.
type EmptyFilterItemBuilder ¶
type EmptyFilterItemBuilder interface {
// AnyEventTypeOf adds one or multiple EventTypes to the current FilterItem.
//
// It sanitizes the input:
// - removing empty EventTypes ("")
// - sorting the EventTypes
// - removing duplicate EventTypes
AnyEventTypeOf(eventType FilterEventTypeString, eventTypes ...FilterEventTypeString) FilterItemBuilderLackingPredicates
// AnyPredicateOf adds one or multiple FilterPredicate(s) to the current FilterItem.
//
// It sanitizes the input:
// - removing empty/partial FilterPredicate(s) (key or val is "")
// - sorting the FilterPredicate(s)
// - removing duplicate FilterPredicate(s)
AnyPredicateOf(predicate FilterPredicate, predicates ...FilterPredicate) FilterItemBuilderLackingEventTypes
// AllPredicatesOf adds one or multiple FilterPredicate(s) to the current FilterItem expecting ALL predicates to match.
//
// It sanitizes the input:
// - removing empty/partial FilterPredicate(s) (key or val is "")
// - sorting the FilterPredicate(s)
// - removing duplicate FilterPredicate(s)
AllPredicatesOf(predicate FilterPredicate, predicates ...FilterPredicate) FilterItemBuilderLackingEventTypes
}
EmptyFilterItemBuilder represents the initial state when starting a new FilterItem. At this stage, you must add either event types or predicates to proceed.
type Filter ¶
type Filter struct {
// contains filtered or unexported fields
}
Filter represents a complete event filter with FilterItems and optional time boundaries. Multiple FilterItems are combined with OR logic, while predicates within items follow configurable AND/OR logic based on AllPredicatesMustMatch.
func (Filter) Items ¶
func (f Filter) Items() []FilterItem
Items returns the collection of FilterItems that define the filter criteria.
func (Filter) OccurredFrom ¶
OccurredFrom returns the lower time boundary for event filtering (inclusive).
func (Filter) OccurredUntil ¶
OccurredUntil returns the upper time boundary for event filtering (inclusive).
type FilterBuilder ¶
type FilterBuilder interface {
// Matching starts a new FilterItem.
Matching() EmptyFilterItemBuilder
// MatchingAnyEvent directly creates an empty Filter.
// WARNING: This returns ALL events and should not be used in production.
MatchingAnyEvent() Filter
// OccurredFrom sets the lower boundary for occurredAt (including this timestamp) for the whole Filter.
OccurredFrom(occurredAtFrom time.Time) CompletedFilterItemBuilderWithOccurredFrom
// OccurredUntil sets the upper boundary for occurredAt (including this timestamp) for the whole Filter.
//
// Currently, there is NO check if OccurredUntil is later than OccurredFrom!
OccurredUntil(occurredAtUntil time.Time) CompletedFilterItemBuilderWithOccurredUntil
}
FilterBuilder builds generic event filters for database-specific query implementations. It enforces useful filter combinations for event-sourced workflows, supporting event types, JSON payload predicates, and time ranges. Complex combinations are supported through multiple FilterItems with OR logic between items and configurable AND/OR logic within items.
func BuildEventFilter ¶
func BuildEventFilter() FilterBuilder
BuildEventFilter creates a FilterBuilder which must eventually be finalized with Finalize() or MatchingAnyEvent(). Note: MatchingAnyEvent() returns ALL events and should not be used in production.
type FilterEventTypeString ¶
type FilterEventTypeString = string
FilterEventTypeString represents an event type identifier used in filter criteria.
type FilterItem ¶
type FilterItem struct {
// contains filtered or unexported fields
}
FilterItem represents a single filter condition combining event types and JSON predicates. Event types are always combined with OR logic, while predicates follow AND / OR logic based on the AllPredicatesMustMatch flag.
func (FilterItem) AllPredicatesMustMatch ¶
func (fi FilterItem) AllPredicatesMustMatch() bool
AllPredicatesMustMatch returns true if all predicates must match (AND logic), false if any predicate can match (OR logic).
func (FilterItem) EventTypes ¶
func (fi FilterItem) EventTypes() []FilterEventTypeString
EventTypes returns the event type identifiers for this filter item.
func (FilterItem) Predicates ¶
func (fi FilterItem) Predicates() []FilterPredicate
Predicates returns the JSON predicates for this filter item.
type FilterItemBuilderLackingEventTypes ¶
type FilterItemBuilderLackingEventTypes interface {
// AndAnyEventTypeOf adds one or multiple EventTypes to the current FilterItem.
//
// It sanitizes the input:
// - removing empty EventTypes ("")
// - sorting the EventTypes
// - removing duplicate EventTypes
AndAnyEventTypeOf(eventType FilterEventTypeString, eventTypes ...FilterEventTypeString) CompletedFilterItemBuilder
// OccurredFrom sets the lower boundary for occurredAt (including this timestamp) for the whole Filter.
OccurredFrom(occurredAtFrom time.Time) CompletedFilterItemBuilderWithOccurredFrom
// OccurredUntil sets the upper boundary for occurredAt (including this timestamp) for the whole Filter.
//
// Currently, there is NO check if OccurredUntil is later than OccurredFrom!
OccurredUntil(occurredAtUntil time.Time) CompletedFilterItemBuilderWithOccurredUntil
// OrMatching finalizes the current FilterItem and starts a new one.
OrMatching() EmptyFilterItemBuilder
// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
Finalize() Filter
}
FilterItemBuilderLackingEventTypes represents a FilterItem with predicates but no event types yet. You can optionally add event types, set time boundaries, add more items, or finalize the filter.
type FilterItemBuilderLackingPredicates ¶
type FilterItemBuilderLackingPredicates interface {
// AndAnyPredicateOf adds one or multiple FilterPredicate(s) to the current FilterItem expecting ANY predicate to match.
//
// It sanitizes the input:
// - removing empty/partial FilterPredicate(s) (key or val is "")
// - sorting the FilterPredicate(s)
// - removing duplicate FilterPredicate(s)
AndAnyPredicateOf(predicate FilterPredicate, predicates ...FilterPredicate) CompletedFilterItemBuilder
// AndAllPredicatesOf adds one or multiple FilterPredicate(s) to the current FilterItem expecting ALL predicates to match.
//
// It sanitizes the input:
// - removing empty/partial FilterPredicate(s) (key or val is "")
// - sorting the FilterPredicate(s)
// - removing duplicate FilterPredicate(s)
AndAllPredicatesOf(predicate FilterPredicate, predicates ...FilterPredicate) CompletedFilterItemBuilder
// OccurredFrom sets the lower boundary for occurredAt (including this timestamp) for the whole Filter.
OccurredFrom(occurredAtFrom time.Time) CompletedFilterItemBuilderWithOccurredFrom
// OccurredUntil sets the upper boundary for occurredAt (including this timestamp) for the whole Filter.
//
// Currently, there is NO check if OccurredUntil is later than OccurredFrom!
OccurredUntil(occurredAtUntil time.Time) CompletedFilterItemBuilderWithOccurredUntil
// OrMatching finalizes the current FilterItem and starts a new one.
OrMatching() EmptyFilterItemBuilder
// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
Finalize() Filter
}
FilterItemBuilderLackingPredicates represents a FilterItem with event types but no predicates yet. You can optionally add predicates, set time boundaries, add more items, or finalize the filter.
type FilterKeyString ¶
type FilterKeyString = string
FilterKeyString represents a JSON key used in filter predicate matching.
type FilterPredicate ¶
type FilterPredicate struct {
// contains filtered or unexported fields
}
FilterPredicate represents a key-value pair for matching JSON payload fields.
func P ¶
func P(key FilterKeyString, val FilterValString) FilterPredicate
P creates a new FilterPredicate with the specified key and value.
func (FilterPredicate) Key ¶
func (fp FilterPredicate) Key() FilterKeyString
Key returns the JSON key for this predicate.
func (FilterPredicate) Val ¶
func (fp FilterPredicate) Val() FilterValString
Val returns the JSON value for this predicate.
type FilterValString ¶
type FilterValString = string
FilterValString represents a JSON value used in filter predicate matching.
type MaxSequenceNumberUint ¶
type MaxSequenceNumberUint = uint
MaxSequenceNumberUint is a type alias for uint, representing the maximum sequence number for a "dynamic event stream".
type StorableEvent ¶
type StorableEvent struct {
EventType string
OccurredAt time.Time
PayloadJSON []byte
MetadataJSON []byte
}
StorableEvent is a DTO (data transfer object) used by the EventStore to append events and query them back.
It is built on scalars to be completely agnostic of the implementation of Domain Events in the client code.
While its properties are exported, it should only be constructed with the supplied factory methods:
- BuildStorableEvent
- BuildStorableEventWithEmptyMetadata
func BuildStorableEvent ¶
func BuildStorableEvent(eventType string, occurredAt time.Time, payloadJSON []byte, metadataJSON []byte) (StorableEvent, error)
BuildStorableEvent is a factory method for StorableEvent.
It populates the StorableEvent with the given scalar input. Returns an error if payloadJSON or metadataJSON are not valid JSON.
func BuildStorableEventWithEmptyMetadata ¶
func BuildStorableEventWithEmptyMetadata(eventType string, occurredAt time.Time, payloadJSON []byte) (StorableEvent, error)
BuildStorableEventWithEmptyMetadata is a factory method for StorableEvent.
It populates the StorableEvent with the given scalar input and creates valid empty JSON for MetadataJSON. Returns an error if payloadJSON is not valid JSON.
type StorableEvents ¶
type StorableEvents = []StorableEvent
StorableEvents is an alias type for a slice of StorableEvent.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package postgresengine provides a PostgreSQL implementation of the eventstore interface.
|
Package postgresengine provides a PostgreSQL implementation of the eventstore interface. |
|
internal/adapters
Package adapters provide database adapter implementations for the PostgreSQL event store.
|
Package adapters provide database adapter implementations for the PostgreSQL event store. |