eventstore

package
v1.2.0-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2025 License: GPL-3.0 Imports: 4 Imported by: 0

Documentation

Overview

Package eventstore provides core abstractions and types for event sourcing with Dynamic Event Streams.

This package defines the fundamental interfaces and types used across different event store implementations, including filters, storable events, and common error definitions.

The event store supports dynamic filtering of events based on:

  • Event types
  • JSON payload predicates
  • Time ranges (occurred from/until)

Key types:

  • Filter: Defines criteria for querying events
  • StorableEvent: Represents an event that can be stored and retrieved
  • StorableEvents: Collection of storable events

Common usage pattern:

// Create a filter for multiple event types with a predicate
filter := BuildEventFilter().
	Matching().
	AnyEventTypeOf(
		core.BookCopyLentToReaderEventType,
		core.BookCopyReturnedByReaderEventType).
	AndAnyPredicateOf(P("BookID", bookID.String())).
	Finalize()

events, maxSeq, err := store.Query(ctx, filter)
if err != nil {
	// handle error
}

newEvent, err := eventstore.BuildStorableEvent(eventType, time.Now(), payload, metadata)
if err != nil {
	// handle error
}
err = store.Append(ctx, filter, maxSeq, newEvent)

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrEmptyEventsTableName is returned when an empty table name is provided during event store configuration.
	ErrEmptyEventsTableName = errors.New("events table name must not be empty")

	// ErrConcurrencyConflict is returned when an optimistic concurrency check fails during event appending.
	ErrConcurrencyConflict = errors.New("concurrency error, no rows were affected")

	// ErrNilDatabaseConnection is returned when a nil database connection is provided to event store constructors.
	ErrNilDatabaseConnection = errors.New("database connection must not be nil")

	// ErrQueryingEventsFailed is returned when the underlying database query operation fails during event retrieval.
	ErrQueryingEventsFailed = errors.New("querying events failed")

	// ErrScanningDBRowFailed is returned when database row scanning fails during event retrieval.
	ErrScanningDBRowFailed = errors.New("scanning db row failed")

	// ErrBuildingStorableEventFailed is returned when construction of a StorableEvent from database data fails.
	ErrBuildingStorableEventFailed = errors.New("building storable event failed")

	// ErrAppendingEventFailed is returned when the underlying database execution fails during event appending.
	ErrAppendingEventFailed = errors.New("appending the event failed")

	// ErrGettingRowsAffectedFailed is returned when retrieving the affected row count after database execution fails.
	ErrGettingRowsAffectedFailed = errors.New("getting rows affected failed")

	// ErrBuildingQueryFailed is returned when SQL query construction fails during event store operations.
	ErrBuildingQueryFailed = errors.New("building the query failed")

	// ErrRowsIterationFailed is returned when iterating over database result rows fails during event retrieval.
	ErrRowsIterationFailed = errors.New("database rows iteration failed")
)
View Source
var (
	// ErrInvalidPayloadJSON is returned when payload JSON is malformed or invalid.
	ErrInvalidPayloadJSON = errors.New("payload json is not valid")

	// ErrInvalidMetadataJSON is returned when metadata JSON is malformed or invalid.
	ErrInvalidMetadataJSON = errors.New("metadata json is not valid")
)

Functions

This section is empty.

Types

type CompletedFilterItemBuilder

type CompletedFilterItemBuilder interface {
	// OccurredFrom sets the lower boundary for occurredAt (including this timestamp) for the whole Filter.
	OccurredFrom(occurredAtFrom time.Time) CompletedFilterItemBuilderWithOccurredFrom

	// OccurredUntil sets the upper boundary for occurredAt (including this timestamp) for the whole Filter.
	//
	// Currently, there is NO check if OccurredUntil is later than OccurredFrom!
	OccurredUntil(occurredAtUntil time.Time) CompletedFilterItemBuilderWithOccurredUntil

	// OrMatching finalizes the current FilterItem and starts a new one.
	OrMatching() EmptyFilterItemBuilder

	// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
	Finalize() Filter
}

CompletedFilterItemBuilder represents a FilterItem with both event types and predicates. You can set time boundaries, add more items, or finalize the filter.

type CompletedFilterItemBuilderWithOccurredFrom

type CompletedFilterItemBuilderWithOccurredFrom interface {
	// AndOccurredUntil sets the upper boundary for occurredAt (including this timestamp) for the whole Filter.
	//
	// Currently, there is NO check if AndOccurredUntil is later than OccurredFrom!
	AndOccurredUntil(occurredAtUntil time.Time) CompletedFilterItemBuilderWithOccurredFromToUntil

	// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
	Finalize() Filter
}

CompletedFilterItemBuilderWithOccurredFrom represents a completed FilterItem with a lower time boundary set. You can optionally add an upper time boundary or finalize the filter.

type CompletedFilterItemBuilderWithOccurredFromToUntil

type CompletedFilterItemBuilderWithOccurredFromToUntil interface {
	// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
	Finalize() Filter
}

CompletedFilterItemBuilderWithOccurredFromToUntil represents a completed FilterItem with both time boundaries set. You can only finalize the filter at this stage.

type CompletedFilterItemBuilderWithOccurredUntil

type CompletedFilterItemBuilderWithOccurredUntil interface {
	// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
	Finalize() Filter
}

CompletedFilterItemBuilderWithOccurredUntil represents a completed FilterItem with an upper time boundary set. You can only finalize the filter at this stage.

type EmptyFilterItemBuilder

type EmptyFilterItemBuilder interface {
	// AnyEventTypeOf adds one or multiple EventTypes to the current FilterItem.
	//
	// It sanitizes the input:
	//	- removing empty EventTypes ("")
	//	- sorting the EventTypes
	//	- removing duplicate EventTypes
	AnyEventTypeOf(eventType FilterEventTypeString, eventTypes ...FilterEventTypeString) FilterItemBuilderLackingPredicates

	// AnyPredicateOf adds one or multiple FilterPredicate(s) to the current FilterItem.
	//
	// It sanitizes the input:
	//	- removing empty/partial FilterPredicate(s) (key or val is "")
	//	- sorting the FilterPredicate(s)
	//	- removing duplicate FilterPredicate(s)
	AnyPredicateOf(predicate FilterPredicate, predicates ...FilterPredicate) FilterItemBuilderLackingEventTypes

	// AllPredicatesOf adds one or multiple FilterPredicate(s) to the current FilterItem expecting ALL predicates to match.
	//
	// It sanitizes the input:
	//	- removing empty/partial FilterPredicate(s) (key or val is "")
	//	- sorting the FilterPredicate(s)
	//	- removing duplicate FilterPredicate(s)
	AllPredicatesOf(predicate FilterPredicate, predicates ...FilterPredicate) FilterItemBuilderLackingEventTypes
}

EmptyFilterItemBuilder represents the initial state when starting a new FilterItem. At this stage, you must add either event types or predicates to proceed.

type Filter

type Filter struct {
	// contains filtered or unexported fields
}

Filter represents a complete event filter with FilterItems and optional time boundaries. Multiple FilterItems are combined with OR logic, while predicates within items follow configurable AND/OR logic based on AllPredicatesMustMatch.

func (Filter) Items

func (f Filter) Items() []FilterItem

Items returns the collection of FilterItems that define the filter criteria.

func (Filter) OccurredFrom

func (f Filter) OccurredFrom() time.Time

OccurredFrom returns the lower time boundary for event filtering (inclusive).

func (Filter) OccurredUntil

func (f Filter) OccurredUntil() time.Time

OccurredUntil returns the upper time boundary for event filtering (inclusive).

type FilterBuilder

type FilterBuilder interface {
	// Matching starts a new FilterItem.
	Matching() EmptyFilterItemBuilder

	// MatchingAnyEvent directly creates an empty Filter.
	// WARNING: This returns ALL events and should not be used in production.
	MatchingAnyEvent() Filter

	// OccurredFrom sets the lower boundary for occurredAt (including this timestamp) for the whole Filter.
	OccurredFrom(occurredAtFrom time.Time) CompletedFilterItemBuilderWithOccurredFrom

	// OccurredUntil sets the upper boundary for occurredAt (including this timestamp) for the whole Filter.
	//
	// Currently, there is NO check if OccurredUntil is later than OccurredFrom!
	OccurredUntil(occurredAtUntil time.Time) CompletedFilterItemBuilderWithOccurredUntil
}

FilterBuilder builds generic event filters for database-specific query implementations. It enforces useful filter combinations for event-sourced workflows, supporting event types, JSON payload predicates, and time ranges. Complex combinations are supported through multiple FilterItems with OR logic between items and configurable AND/OR logic within items.

func BuildEventFilter

func BuildEventFilter() FilterBuilder

BuildEventFilter creates a FilterBuilder which must eventually be finalized with Finalize() or MatchingAnyEvent(). Note: MatchingAnyEvent() returns ALL events and should not be used in production.

type FilterEventTypeString

type FilterEventTypeString = string

FilterEventTypeString represents an event type identifier used in filter criteria.

type FilterItem

type FilterItem struct {
	// contains filtered or unexported fields
}

FilterItem represents a single filter condition combining event types and JSON predicates. Event types are always combined with OR logic, while predicates follow AND / OR logic based on the AllPredicatesMustMatch flag.

func (FilterItem) AllPredicatesMustMatch

func (fi FilterItem) AllPredicatesMustMatch() bool

AllPredicatesMustMatch returns true if all predicates must match (AND logic), false if any predicate can match (OR logic).

func (FilterItem) EventTypes

func (fi FilterItem) EventTypes() []FilterEventTypeString

EventTypes returns the event type identifiers for this filter item.

func (FilterItem) Predicates

func (fi FilterItem) Predicates() []FilterPredicate

Predicates returns the JSON predicates for this filter item.

type FilterItemBuilderLackingEventTypes

type FilterItemBuilderLackingEventTypes interface {
	// AndAnyEventTypeOf adds one or multiple EventTypes to the current FilterItem.
	//
	// It sanitizes the input:
	//	- removing empty EventTypes ("")
	//	- sorting the EventTypes
	//	- removing duplicate EventTypes
	AndAnyEventTypeOf(eventType FilterEventTypeString, eventTypes ...FilterEventTypeString) CompletedFilterItemBuilder

	// OccurredFrom sets the lower boundary for occurredAt (including this timestamp) for the whole Filter.
	OccurredFrom(occurredAtFrom time.Time) CompletedFilterItemBuilderWithOccurredFrom

	// OccurredUntil sets the upper boundary for occurredAt (including this timestamp) for the whole Filter.
	//
	// Currently, there is NO check if OccurredUntil is later than OccurredFrom!
	OccurredUntil(occurredAtUntil time.Time) CompletedFilterItemBuilderWithOccurredUntil

	// OrMatching finalizes the current FilterItem and starts a new one.
	OrMatching() EmptyFilterItemBuilder

	// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
	Finalize() Filter
}

FilterItemBuilderLackingEventTypes represents a FilterItem with predicates but no event types yet. You can optionally add event types, set time boundaries, add more items, or finalize the filter.

type FilterItemBuilderLackingPredicates

type FilterItemBuilderLackingPredicates interface {
	// AndAnyPredicateOf adds one or multiple FilterPredicate(s) to the current FilterItem expecting ANY predicate to match.
	//
	// It sanitizes the input:
	//	- removing empty/partial FilterPredicate(s) (key or val is "")
	//	- sorting the FilterPredicate(s)
	//	- removing duplicate FilterPredicate(s)
	AndAnyPredicateOf(predicate FilterPredicate, predicates ...FilterPredicate) CompletedFilterItemBuilder

	// AndAllPredicatesOf adds one or multiple FilterPredicate(s) to the current FilterItem expecting ALL predicates to match.
	//
	// It sanitizes the input:
	//	- removing empty/partial FilterPredicate(s) (key or val is "")
	//	- sorting the FilterPredicate(s)
	//	- removing duplicate FilterPredicate(s)
	AndAllPredicatesOf(predicate FilterPredicate, predicates ...FilterPredicate) CompletedFilterItemBuilder

	// OccurredFrom sets the lower boundary for occurredAt (including this timestamp) for the whole Filter.
	OccurredFrom(occurredAtFrom time.Time) CompletedFilterItemBuilderWithOccurredFrom

	// OccurredUntil sets the upper boundary for occurredAt (including this timestamp) for the whole Filter.
	//
	// Currently, there is NO check if OccurredUntil is later than OccurredFrom!
	OccurredUntil(occurredAtUntil time.Time) CompletedFilterItemBuilderWithOccurredUntil

	// OrMatching finalizes the current FilterItem and starts a new one.
	OrMatching() EmptyFilterItemBuilder

	// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
	Finalize() Filter
}

FilterItemBuilderLackingPredicates represents a FilterItem with event types but no predicates yet. You can optionally add predicates, set time boundaries, add more items, or finalize the filter.

type FilterKeyString

type FilterKeyString = string

FilterKeyString represents a JSON key used in filter predicate matching.

type FilterPredicate

type FilterPredicate struct {
	// contains filtered or unexported fields
}

FilterPredicate represents a key-value pair for matching JSON payload fields.

func P

P creates a new FilterPredicate with the specified key and value.

func (FilterPredicate) Key

Key returns the JSON key for this predicate.

func (FilterPredicate) Val

Val returns the JSON value for this predicate.

type FilterValString

type FilterValString = string

FilterValString represents a JSON value used in filter predicate matching.

type MaxSequenceNumberUint

type MaxSequenceNumberUint = uint

MaxSequenceNumberUint is a type alias for uint, representing the maximum sequence number for a "dynamic event stream".

type StorableEvent

type StorableEvent struct {
	EventType    string
	OccurredAt   time.Time
	PayloadJSON  []byte
	MetadataJSON []byte
}

StorableEvent is a DTO (data transfer object) used by the EventStore to append events and query them back.

It is built on scalars to be completely agnostic of the implementation of Domain Events in the client code.

While its properties are exported, it should only be constructed with the supplied factory methods:

  • BuildStorableEvent
  • BuildStorableEventWithEmptyMetadata

func BuildStorableEvent

func BuildStorableEvent(eventType string, occurredAt time.Time, payloadJSON []byte, metadataJSON []byte) (StorableEvent, error)

BuildStorableEvent is a factory method for StorableEvent.

It populates the StorableEvent with the given scalar input. Returns an error if payloadJSON or metadataJSON are not valid JSON.

func BuildStorableEventWithEmptyMetadata

func BuildStorableEventWithEmptyMetadata(eventType string, occurredAt time.Time, payloadJSON []byte) (StorableEvent, error)

BuildStorableEventWithEmptyMetadata is a factory method for StorableEvent.

It populates the StorableEvent with the given scalar input and creates valid empty JSON for MetadataJSON. Returns an error if payloadJSON is not valid JSON.

type StorableEvents

type StorableEvents = []StorableEvent

StorableEvents is an alias type for a slice of StorableEvent.

Directories

Path Synopsis
Package postgresengine provides a PostgreSQL implementation of the eventstore interface.
Package postgresengine provides a PostgreSQL implementation of the eventstore interface.
internal/adapters
Package adapters provide database adapter implementations for the PostgreSQL event store.
Package adapters provide database adapter implementations for the PostgreSQL event store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL