eventstore

package
v1.1.4-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 31, 2025 License: GPL-3.0 Imports: 4 Imported by: 0

Documentation

Overview

Package eventstore provides core abstractions and types for event sourcing with Dynamic Event Streams.

This package defines the fundamental interfaces and types used across different event store implementations, including filters, storable events, and common error definitions.

The event store supports dynamic filtering of events based on:

  • Event types
  • JSON payload predicates
  • Time ranges (occurred from/until)

Key types:

  • Filter: Defines criteria for querying events
  • StorableEvent: Represents an event that can be stored and retrieved
  • StorableEvents: Collection of storable events

Common usage pattern:

// Create a filter for multiple event types with a predicate
filter := BuildEventFilter().
	Matching().
	AnyEventTypeOf(
		core.BookCopyLentToReaderEventType,
		core.BookCopyReturnedByReaderEventType).
	AndAnyPredicateOf(P("BookID", bookID.String())).
	Finalize()

events, maxSeq, err := store.Query(ctx, filter)
if err != nil {
	// handle error
}

newEvent, err := eventstore.BuildStorableEvent(eventType, time.Now(), payload, metadata)
if err != nil {
	// handle error
}
err = store.Append(ctx, filter, maxSeq, newEvent)

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEmptyEventsTableName        = errors.New("events table name must not be empty")
	ErrConcurrencyConflict         = errors.New("concurrency error, no rows were affected")
	ErrNilDatabaseConnection       = errors.New("database connection must not be nil")
	ErrQueryingEventsFailed        = errors.New("querying events failed")
	ErrScanningDBRowFailed         = errors.New("scanning db row failed")
	ErrBuildingStorableEventFailed = errors.New("building storable event failed")
	ErrAppendingEventFailed        = errors.New("appending the event failed")
	ErrGettingRowsAffectedFailed   = errors.New("getting rows affected failed")
	ErrBuildingQueryFailed         = errors.New("building the query failed")
)
View Source
var ErrInvalidMetadataJSON = errors.New("metadata json is not valid")
View Source
var ErrInvalidPayloadJSON = errors.New("payload json is not valid")

Functions

This section is empty.

Types

type CompletedFilterItemBuilder

type CompletedFilterItemBuilder interface {
	// OccurredFrom sets the lower boundary for occurredAt (including this timestamp) for the whole Filter.
	OccurredFrom(occurredAtFrom time.Time) CompletedFilterItemBuilderWithOccurredFrom

	// OccurredUntil sets the upper boundary for occurredAt (including this timestamp) for the whole Filter.
	//
	// Currently, there is NO check if OccurredUntil is later than OccurredFrom!
	OccurredUntil(occurredAtUntil time.Time) CompletedFilterItemBuilderWithOccurredUntil

	// OrMatching finalizes the current FilterItem and starts a new one.
	OrMatching() EmptyFilterItemBuilder

	// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
	Finalize() Filter
}

type CompletedFilterItemBuilderWithOccurredFrom

type CompletedFilterItemBuilderWithOccurredFrom interface {
	// AndOccurredUntil sets the upper boundary for occurredAt (including this timestamp) for the whole Filter.
	//
	// Currently, there is NO check if AndOccurredUntil is later than OccurredFrom!
	AndOccurredUntil(occurredAtUntil time.Time) CompletedFilterItemBuilderWithOccurredFromToUntil

	// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
	Finalize() Filter
}

type CompletedFilterItemBuilderWithOccurredFromToUntil

type CompletedFilterItemBuilderWithOccurredFromToUntil interface {
	// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
	Finalize() Filter
}

type CompletedFilterItemBuilderWithOccurredUntil

type CompletedFilterItemBuilderWithOccurredUntil interface {
	// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
	Finalize() Filter
}

type EmptyFilterItemBuilder

type EmptyFilterItemBuilder interface {
	// AnyEventTypeOf adds one or multiple EventTypes to the current FilterItem.
	//
	// It sanitizes the input:
	//	- removing empty EventTypes ("")
	//	- sorting the EventTypes
	//	- removing duplicate EventTypes
	AnyEventTypeOf(eventType FilterEventTypeString, eventTypes ...FilterEventTypeString) FilterItemBuilderLackingPredicates

	// AnyPredicateOf adds one or multiple FilterPredicate(s) to the current FilterItem.
	//
	// It sanitizes the input:
	//	- removing empty/partial FilterPredicate(s) (key or val is "")
	//	- sorting the FilterPredicate(s)
	//	- removing duplicate FilterPredicate(s)
	AnyPredicateOf(predicate FilterPredicate, predicates ...FilterPredicate) FilterItemBuilderLackingEventTypes

	// AllPredicatesOf adds one or multiple FilterPredicate(s) to the current FilterItem expecting ALL predicates to match.
	//
	// It sanitizes the input:
	//	- removing empty/partial FilterPredicate(s) (key or val is "")
	//	- sorting the FilterPredicate(s)
	//	- removing duplicate FilterPredicate(s)
	AllPredicatesOf(predicate FilterPredicate, predicates ...FilterPredicate) FilterItemBuilderLackingEventTypes
}

type Filter

type Filter struct {
	// contains filtered or unexported fields
}

func (Filter) Items

func (f Filter) Items() []FilterItem

func (Filter) OccurredFrom

func (f Filter) OccurredFrom() time.Time

func (Filter) OccurredUntil

func (f Filter) OccurredUntil() time.Time

type FilterBuilder

type FilterBuilder interface {
	// Matching starts a new FilterItem.
	Matching() EmptyFilterItemBuilder

	// MatchingAnyEvent directly creates an empty Filter.
	// WARNING: This returns ALL events and should not be used in production.
	MatchingAnyEvent() Filter

	// OccurredFrom sets the lower boundary for occurredAt (including this timestamp) for the whole Filter.
	OccurredFrom(occurredAtFrom time.Time) CompletedFilterItemBuilderWithOccurredFrom

	// OccurredUntil sets the upper boundary for occurredAt (including this timestamp) for the whole Filter.
	//
	// Currently, there is NO check if OccurredUntil is later than OccurredFrom!
	OccurredUntil(occurredAtUntil time.Time) CompletedFilterItemBuilderWithOccurredUntil
}

FilterBuilder builds generic event filters for database-specific query implementations. It enforces useful filter combinations for event-sourced workflows, supporting event types, JSON payload predicates, and time ranges. Complex combinations are supported through multiple FilterItems with OR logic between items and configurable AND/OR logic within items.

func BuildEventFilter

func BuildEventFilter() FilterBuilder

BuildEventFilter creates a FilterBuilder which must eventually be finalized with Finalize() or MatchingAnyEvent(). Note: MatchingAnyEvent() returns ALL events and should not be used in production.

type FilterEventTypeString

type FilterEventTypeString = string

type FilterItem

type FilterItem struct {
	// contains filtered or unexported fields
}

func (FilterItem) AllPredicatesMustMatch

func (fi FilterItem) AllPredicatesMustMatch() bool

func (FilterItem) EventTypes

func (fi FilterItem) EventTypes() []FilterEventTypeString

func (FilterItem) Predicates

func (fi FilterItem) Predicates() []FilterPredicate

type FilterItemBuilderLackingEventTypes

type FilterItemBuilderLackingEventTypes interface {
	// AndAnyEventTypeOf adds one or multiple EventTypes to the current FilterItem.
	//
	// It sanitizes the input:
	//	- removing empty EventTypes ("")
	//	- sorting the EventTypes
	//	- removing duplicate EventTypes
	AndAnyEventTypeOf(eventType FilterEventTypeString, eventTypes ...FilterEventTypeString) CompletedFilterItemBuilder

	// OccurredFrom sets the lower boundary for occurredAt (including this timestamp) for the whole Filter.
	OccurredFrom(occurredAtFrom time.Time) CompletedFilterItemBuilderWithOccurredFrom

	// OccurredUntil sets the upper boundary for occurredAt (including this timestamp) for the whole Filter.
	//
	// Currently, there is NO check if OccurredUntil is later than OccurredFrom!
	OccurredUntil(occurredAtUntil time.Time) CompletedFilterItemBuilderWithOccurredUntil

	// OrMatching finalizes the current FilterItem and starts a new one.
	OrMatching() EmptyFilterItemBuilder

	// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
	Finalize() Filter
}

type FilterItemBuilderLackingPredicates

type FilterItemBuilderLackingPredicates interface {
	// AndAnyPredicateOf adds one or multiple FilterPredicate(s) to the current FilterItem expecting ANY predicate to match.
	//
	// It sanitizes the input:
	//	- removing empty/partial FilterPredicate(s) (key or val is "")
	//	- sorting the FilterPredicate(s)
	//	- removing duplicate FilterPredicate(s)
	AndAnyPredicateOf(predicate FilterPredicate, predicates ...FilterPredicate) CompletedFilterItemBuilder

	// AndAllPredicatesOf adds one or multiple FilterPredicate(s) to the current FilterItem expecting ALL predicates to match.
	//
	// It sanitizes the input:
	//	- removing empty/partial FilterPredicate(s) (key or val is "")
	//	- sorting the FilterPredicate(s)
	//	- removing duplicate FilterPredicate(s)
	AndAllPredicatesOf(predicate FilterPredicate, predicates ...FilterPredicate) CompletedFilterItemBuilder

	// OccurredFrom sets the lower boundary for occurredAt (including this timestamp) for the whole Filter.
	OccurredFrom(occurredAtFrom time.Time) CompletedFilterItemBuilderWithOccurredFrom

	// OccurredUntil sets the upper boundary for occurredAt (including this timestamp) for the whole Filter.
	//
	// Currently, there is NO check if OccurredUntil is later than OccurredFrom!
	OccurredUntil(occurredAtUntil time.Time) CompletedFilterItemBuilderWithOccurredUntil

	// OrMatching finalizes the current FilterItem and starts a new one.
	OrMatching() EmptyFilterItemBuilder

	// Finalize returns the Filter once it has at least one FilterItem with at least one EventType OR one Predicate.
	Finalize() Filter
}

type FilterKeyString

type FilterKeyString = string

type FilterPredicate

type FilterPredicate struct {
	// contains filtered or unexported fields
}

func (FilterPredicate) Key

func (FilterPredicate) Val

type FilterValString

type FilterValString = string

type MaxSequenceNumberUint

type MaxSequenceNumberUint = uint

MaxSequenceNumberUint is a type alias for uint, representing the maximum sequence number for a "dynamic event stream".

type StorableEvent

type StorableEvent struct {
	EventType    string
	OccurredAt   time.Time
	PayloadJSON  []byte
	MetadataJSON []byte
}

StorableEvent is a DTO (data transfer object) used by the EventStore to append events and query them back.

It is built on scalars to be completely agnostic of the implementation of Domain Events in the client code.

While its properties are exported, it should only be constructed with the supplied factory methods:

  • BuildStorableEvent
  • BuildStorableEventWithEmptyMetadata

func BuildStorableEvent

func BuildStorableEvent(eventType string, occurredAt time.Time, payloadJSON []byte, metadataJSON []byte) (StorableEvent, error)

BuildStorableEvent is a factory method for StorableEvent.

It populates the StorableEvent with the given scalar input. Returns an error if payloadJSON or metadataJSON are not valid JSON.

func BuildStorableEventWithEmptyMetadata

func BuildStorableEventWithEmptyMetadata(eventType string, occurredAt time.Time, payloadJSON []byte) (StorableEvent, error)

BuildStorableEventWithEmptyMetadata is a factory method for StorableEvent.

It populates the StorableEvent with the given scalar input and creates valid empty JSON for MetadataJSON. Returns an error if payloadJSON is not valid JSON.

type StorableEvents

type StorableEvents = []StorableEvent

StorableEvents is an alias type for a slice of StorableEvent

Directories

Path Synopsis
Package postgresengine provides a PostgreSQL implementation of the eventstore interface.
Package postgresengine provides a PostgreSQL implementation of the eventstore interface.
internal/adapters
Package adapters provide database adapter implementations for the PostgreSQL event store.
Package adapters provide database adapter implementations for the PostgreSQL event store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL