evercore

package
v0.0.42 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 13, 2025 License: MIT Imports: 8 Imported by: 3

Documentation

Index

Constants

View Source
const (
	StartBeginning = "beginning"
	StartEnd       = "end"
	StartEventID   = "event_id"
	StartTimestamp = "timestamp"
)

StartFromKind identifies how a subscription initializes its cursor.

View Source
const (
	ErrorNotFound                = "not_found"
	ErrorTypeConstraintViolation = "constraint_violation"
)

Common storage engine error types

Variables

View Source
var ErrEventStateIsNotAStateEvent = ErrorMessage("Event state is not a StateEvent.")
View Source
var ErrSubscriptionAlreadyOwned = errors.New("subscription already owned")

ErrSubscriptionAlreadyOwned indicates the subscription could not be claimed because another owner currently holds the lease.

View Source
var ErrorKeyExceedsMaximumLength error = fmt.Errorf("The specified key exceeds the maximum key length")

Functions

func CopyFields

func CopyFields(e, t any) error

CopyFields uses reflection to copy matching fields from e to t. If a pointer field in e is nil, the field is skipped.

func DecodeEventStateTo

func DecodeEventStateTo[U any](e SerializedEvent, state *U) error

Decodes the event state into the specified type.

func DeserializeFromJson

func DeserializeFromJson[T any](serialized string, target *T) error

func InContext

func InContext[T any](ctx context.Context, store *EventStore, exec contextFuncReturns[T]) (T, error)

Executes a function callback that returns a value after the context complets. This is the similar to the EventStore.Incontext method except it allows a return value after completion.

func InReadonlyContext

func InReadonlyContext[T any](ctx context.Context, store *EventStore, exec readonlyContextFuncReturns[T]) (T, error)

Executes a function within a readonly context returning a value.

func RegisterEventDecoder added in v0.0.27

func RegisterEventDecoder(decoders ...EventDecoder)

RegisterEventDecoder sets the EventDecoder for StateAggregate - the function should be able to decode all events for any aggregate using StateAggregate.

func SerializeToJson

func SerializeToJson(ev any) string

func TypeNameFromType

func TypeNameFromType[T any](ev T) string

Types

type Aggregate

type Aggregate interface {
	GetId() int64
	SetId(int64)
	GetSequence() int64
	SetSequence(int64)
	GetAggregateType() string
	GetSnapshotFrequency() int64
	GetSnapshotState() (*string, error)
	ApplyEventState(eventState EventState, eventTime time.Time, reference string) error
	ApplySnapshot(snapshot *Snapshot) error
}

Optinionated version of aggregate

type AggregateState

type AggregateState struct {
	AggregateId int64
	NaturalKey  *string
	Snapshot    *Snapshot
	Events      EventSlice
}

Represents the state of the aggregate including the latest snapshot (if it exists) and the events required to reconstitute its current state.

type ApplyFunc added in v0.0.27

type ApplyFunc func(eventState EventState, eventTime time.Time, reference string) error

type ContextOwner

type ContextOwner interface {
	// contains filtered or unexported methods
}

Methods for the EventStoreContext uses.

type ErrorMessage added in v0.0.27

type ErrorMessage string

func (ErrorMessage) Error added in v0.0.27

func (e ErrorMessage) Error() string

type EventDecoder

type EventDecoder func(ev SerializedEvent) (EventState, error)

EventDecoder is a function that decodes an event into an EventState.

type EventSlice

type EventSlice []SerializedEvent

Slice of serialized events.

type EventState

type EventState interface {
	GetEventType() string
	Serialize() string
}

EventState represents the state of an event.

func DecodeEvent added in v0.0.27

func DecodeEvent(ev SerializedEvent) (bool, EventState, error)

DecodeEventStateTo decodes an event into an EventState.

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

EventStore is the main entry point for interacting with the event store.

func NewEventStore

func NewEventStore(storageEngine StorageEngine) *EventStore

func (*EventStore) Close added in v0.0.30

func (store *EventStore) Close() error

Closes the storage engine.

func (*EventStore) RunEphemeralSubscription added in v0.0.42

func (store *EventStore) RunEphemeralSubscription(
	ctx context.Context,
	filter SubscriptionFilter,
	start StartFrom,
	opts Options,
	handler func(context.Context, []SerializedEvent) error,
) error

RunEphemeralSubscription streams events without creating any persisted subscription metadata or cursor. The position is tracked in-process only and is lost when the caller exits. Useful for local caches, projections that can rebuild, etc.

Supports:

  • AggregateType (optional)
  • AggregateKey (optional)
  • EventTypes: if multiple are provided, filtering is applied in-memory. For a single event type, filtering happens in the storage engine query.

func (*EventStore) RunSubscription added in v0.0.42

func (store *EventStore) RunSubscription(
	ctx context.Context,
	name string,
	filter SubscriptionFilter,
	start StartFrom,
	opts Options,
	handler func(context.Context, []SerializedEvent) error,
) error

RunSubscription ensures a durable subscription exists, claims a lease, and processes events using handler with at-least-once semantics.

func (*EventStore) SaveState

func (store *EventStore) SaveState(ctx context.Context, tx StorageEngineTxInfo, events EventSlice, snapshots SnapshotSlice) error

This is used at the end of a context to save all the events and snapshots that happened during the context.

func (*EventStore) Warmup

func (store *EventStore) Warmup(ctx context.Context, knownAggregateTypes []string, knownEventTypes []string) error

Warmup pre-loads aggregate and event types into memory to avoid database calls during contexts. This is intended to be called during application startup.

Parameters:

  • knownAggregateTypes: List of aggregate type names to pre-load
  • knownEventTypes: List of event type names to pre-load

This memoizes the type IDs in an internal map, saving database calls during normal operation. Should be called as early as possible in application startup.

func (*EventStore) WithContext

func (store *EventStore) WithContext(ctx context.Context, exec contextFunc) error

Executes a function inside a context This is used to ensure the all events and snapshots are written after the context completes or that they are not written when an error happens.

func (*EventStore) WithReadonlyContext

func (store *EventStore) WithReadonlyContext(ctx context.Context, exec readonlyContextFunc) error

Executes a callback function within a readonly context

type EventStoreContext

type EventStoreContext interface {
	EventStoreReadonlyContext
	NewAggregateId(aggregateType string) (int64, error)
	NewAggregateIdWithKey(aggregateType string, naturalKey string) (int64, error)

	LoadOrCreateAggregate(agg Aggregate, naturalKey string) (bool, error)
	CreateAggregateInto(agg Aggregate) error
	CreateAggregateWithKeyInto(agg Aggregate, naturalKey string) error
	ChangeAggregateNaturalKey(id int64, naturalKey string) error
	ApplyEventTo(agg Aggregate, event EventState, time time.Time, reference string) error

	Publish(*SerializedEvent)
	SaveSnapshot(snapshot Snapshot)
}

Represents a read/write context access to the event store.

type EventStoreContextType

type EventStoreContextType struct {
	Transaction StorageEngineTxInfo
	// contains filtered or unexported fields
}

Tracks information related to the current event store context.

func (*EventStoreContextType) ApplyEventTo

func (etx *EventStoreContextType) ApplyEventTo(
	agg Aggregate,
	eventState EventState,
	time time.Time,
	reference string) error

Applies an event to the aggregate.

func (*EventStoreContextType) ChangeAggregateNaturalKey added in v0.0.35

func (etx *EventStoreContextType) ChangeAggregateNaturalKey(
	id int64,
	naturalKey string) error

Changes the natural key of an aggregate.

func (*EventStoreContextType) CreateAggregateInto

func (etx *EventStoreContextType) CreateAggregateInto(agg Aggregate) error

Creates a new aggregate of the specified type.

func (*EventStoreContextType) CreateAggregateWithKeyInto

func (etx *EventStoreContextType) CreateAggregateWithKeyInto(
	agg Aggregate,
	naturalKey string) error

Creates a new aggregate of the specified type with the specified natural key.

func (*EventStoreContextType) LoadAggregateState

func (ctx *EventStoreContextType) LoadAggregateState(
	aggregateType string,
	aggregateId int64) (*AggregateState, error)

Loads the most recent snapshot and events from the event store

func (*EventStoreContextType) LoadAggregateStateByKey

func (ctx *EventStoreContextType) LoadAggregateStateByKey(
	aggregateType string,
	naturalKey string) (*AggregateState, error)

Loads the aggregate state using the natural key.

func (*EventStoreContextType) LoadOrCreateAggregate

func (etx *EventStoreContextType) LoadOrCreateAggregate(
	agg Aggregate,
	naturalKey string) (bool, error)

LoadOrCreateAggregate loads an existing aggregate by natural key if it exists, otherwise creates a new one. Returns true if the aggregate was created, false if loaded.

func (*EventStoreContextType) LoadStateByKeyInto

func (etx *EventStoreContextType) LoadStateByKeyInto(
	agg Aggregate,
	naturalKey string) error

Loads the state of an aggregate into the aggregate using the natural key.

func (*EventStoreContextType) LoadStateInto

func (etx *EventStoreContextType) LoadStateInto(
	agg Aggregate,
	aggregateId int64) error

Loads the state of an aggregate into the aggregate.

func (EventStoreContextType) NewAggregateId

func (ctx EventStoreContextType) NewAggregateId(aggregateType string) (int64, error)

Adds a new aggregate stream and returns the resulting id.

func (EventStoreContextType) NewAggregateIdWithKey

func (ctx EventStoreContextType) NewAggregateIdWithKey(
	aggregateType string,
	naturalKey string) (int64, error)

Adds a new aggregate stream with the specified natural key and returns the resulting id.

func (*EventStoreContextType) Publish

func (ctx *EventStoreContextType) Publish(event *SerializedEvent)

Publishes an event for storage once the context completes.

func (*EventStoreContextType) SaveSnapshot

func (etx *EventStoreContextType) SaveSnapshot(snapshot Snapshot)

Stages a snapshot to save once the context completes.

type EventStoreReadonlyContext

type EventStoreReadonlyContext interface {
	LoadStateInto(aggregate Aggregate, id int64) error
	LoadStateByKeyInto(aggregate Aggregate, naturalKey string) error

	LoadAggregateState(aggregateType string, aggregateId int64) (*AggregateState, error)
	LoadAggregateStateByKey(aggregateType string, naturalKey string) (*AggregateState, error)
}

Represents a readonly context access to the event store.

type IdNameMap

type IdNameMap map[int64]string

Represents a map of id to name.

func MapIdToName

func MapIdToName(idNamePair []IdNamePair) IdNameMap

Maps a slice of IdNamePair to a map of id to name.

type IdNamePair

type IdNamePair struct {
	Id   int64
	Name string
}

Represents a pair of id and name.

type MemoryStorageEngine

type MemoryStorageEngine struct {
	CapturedEvents    []StorageEngineEvent
	CapturedSnapshots []Snapshot

	AggregateTypes     map[string]int64
	AggregateTypesInv  map[int64]string
	EventTypes         map[string]int64
	EventTypesInv      map[int64]string
	CountAggregateType int64
	CountEventTypes    int64
	CountAggregates    int64
	AggregateToTypeId  map[int64]int64
	Aggregates         map[string]int64
	AggregateInv       map[int64]*string

	// Subscriptions (in-memory only)
	Subscriptions          map[string]*Subscription
	SubscriptionByID       map[int64]*Subscription
	SubscriptionEventTypes map[int64]map[int64]bool // subscriptionID -> set(eventTypeID)
	NextSubscriptionID     int64
}

Ephemeral memory storage engine useful for testing without a database.

func NewMemoryStorageEngine

func NewMemoryStorageEngine() *MemoryStorageEngine

NewMemoryStorageEngine creates a new in-memory storage engine.

func (*MemoryStorageEngine) AddSubscriptionEventType added in v0.0.42

func (store *MemoryStorageEngine) AddSubscriptionEventType(tx StorageEngineTxInfo, ctx context.Context, subscriptionId int64, eventTypeId int64) error

func (*MemoryStorageEngine) AdvanceSubscriptionCursor added in v0.0.42

func (store *MemoryStorageEngine) AdvanceSubscriptionCursor(tx StorageEngineTxInfo, ctx context.Context, id int64, lastEventId int64) error

func (*MemoryStorageEngine) ChangeAggregateNaturalKey added in v0.0.33

func (store *MemoryStorageEngine) ChangeAggregateNaturalKey(tx StorageEngineTxInfo, ctx context.Context, aggregateId int64, naturalKey string) error

func (*MemoryStorageEngine) ClaimSubscription added in v0.0.42

func (store *MemoryStorageEngine) ClaimSubscription(tx StorageEngineTxInfo, ctx context.Context, name string, owner string, lease time.Duration) (bool, error)

func (*MemoryStorageEngine) Close added in v0.0.30

func (store *MemoryStorageEngine) Close() error

func (*MemoryStorageEngine) GetAggregateById

func (store *MemoryStorageEngine) GetAggregateById(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, aggregateId int64) (int64, *string, error)

func (*MemoryStorageEngine) GetAggregateByKey

func (store *MemoryStorageEngine) GetAggregateByKey(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, naturalKey string) (int64, error)

func (*MemoryStorageEngine) GetAggregateTypeId

func (store *MemoryStorageEngine) GetAggregateTypeId(tx StorageEngineTxInfo, ctx context.Context, name string) (int64, error)

func (*MemoryStorageEngine) GetAggregateTypes

func (store *MemoryStorageEngine) GetAggregateTypes(tx StorageEngineTxInfo, ctx context.Context) ([]IdNamePair, error)

func (*MemoryStorageEngine) GetEventTypeId

func (store *MemoryStorageEngine) GetEventTypeId(tx StorageEngineTxInfo, ctx context.Context, name string) (int64, error)

func (*MemoryStorageEngine) GetEventTypes

func (store *MemoryStorageEngine) GetEventTypes(tx StorageEngineTxInfo, ctx context.Context) ([]IdNamePair, error)

func (*MemoryStorageEngine) GetEventsForAggregate

func (store *MemoryStorageEngine) GetEventsForAggregate(tx StorageEngineTxInfo, ctx context.Context, aggregateId int64, afterSequence int64) ([]SerializedEvent, error)

func (*MemoryStorageEngine) GetEventsForSubscription added in v0.0.42

func (store *MemoryStorageEngine) GetEventsForSubscription(tx StorageEngineTxInfo, ctx context.Context, sub *Subscription, limit int) ([]SerializedEvent, error)

func (*MemoryStorageEngine) GetFirstEventIdFromTimestamp added in v0.0.42

func (store *MemoryStorageEngine) GetFirstEventIdFromTimestamp(tx StorageEngineTxInfo, ctx context.Context, ts time.Time) (int64, error)

func (*MemoryStorageEngine) GetMaxEventId added in v0.0.42

func (store *MemoryStorageEngine) GetMaxEventId(tx StorageEngineTxInfo, ctx context.Context) (int64, error)

func (*MemoryStorageEngine) GetMaxKeyLength

func (stor *MemoryStorageEngine) GetMaxKeyLength() int

func (*MemoryStorageEngine) GetOrCreateAggregateByKey

func (store *MemoryStorageEngine) GetOrCreateAggregateByKey(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, naturalKey string) (bool, int64, error)

func (*MemoryStorageEngine) GetSnapshotForAggregate

func (store *MemoryStorageEngine) GetSnapshotForAggregate(tx StorageEngineTxInfo, ctx context.Context, aggregateId int64) (*Snapshot, error)

func (*MemoryStorageEngine) GetSubscriptionByName added in v0.0.42

func (store *MemoryStorageEngine) GetSubscriptionByName(tx StorageEngineTxInfo, ctx context.Context, name string) (*Subscription, error)

func (*MemoryStorageEngine) GetTransactionInfo

func (stor *MemoryStorageEngine) GetTransactionInfo() (StorageEngineTxInfo, error)

func (*MemoryStorageEngine) NewAggregate

func (store *MemoryStorageEngine) NewAggregate(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64) (int64, error)

func (*MemoryStorageEngine) NewAggregateWithKey

func (store *MemoryStorageEngine) NewAggregateWithKey(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, naturalKey string) (int64, error)

func (*MemoryStorageEngine) ReleaseSubscription added in v0.0.42

func (store *MemoryStorageEngine) ReleaseSubscription(tx StorageEngineTxInfo, ctx context.Context, name string, owner string) error

func (*MemoryStorageEngine) RenewSubscription added in v0.0.42

func (store *MemoryStorageEngine) RenewSubscription(tx StorageEngineTxInfo, ctx context.Context, name string, owner string, lease time.Duration) (bool, error)

func (*MemoryStorageEngine) SetSubscriptionActive added in v0.0.42

func (store *MemoryStorageEngine) SetSubscriptionActive(tx StorageEngineTxInfo, ctx context.Context, id int64, active bool) error

func (*MemoryStorageEngine) UpsertSubscription added in v0.0.42

func (store *MemoryStorageEngine) UpsertSubscription(tx StorageEngineTxInfo, ctx context.Context, name string, aggregateTypeId *int64, eventTypeId *int64, aggregateKey *string, startFrom string, startEventId int64, startTimestamp *time.Time) (int64, error)

func (*MemoryStorageEngine) WriteState

func (store *MemoryStorageEngine) WriteState(tx StorageEngineTxInfo, ctx context.Context, events []StorageEngineEvent, snapshot SnapshotSlice) error

type MemoryStorageEngineTransaction

type MemoryStorageEngineTransaction struct {
}

MemoryStorageEngineTransaction is a transaction for the in-memory storage engine.

func (MemoryStorageEngineTransaction) Commit

func (MemoryStorageEngineTransaction) Rollback

func (tx MemoryStorageEngineTransaction) Rollback() error

type NameIdMap

type NameIdMap map[string]int64

Represents a map of name to id.

func MapNameToId

func MapNameToId(idNamePair []IdNamePair) NameIdMap

Maps a slice of IdNamePair to a map of name to id.

type Options added in v0.0.42

type Options struct {
	BatchSize    int
	PollInterval time.Duration
	Lease        time.Duration
	Owner        string // optional; autogenerated if empty
}

Options controls runtime behavior of a subscription runner.

type SerializedEvent

type SerializedEvent struct {
	// EventID is the global, monotonically increasing id from the events table.
	// It may be zero when events are loaded by-aggregate (not via event_log).
	EventID     int64
	AggregateId int64
	EventType   string
	State       string
	Sequence    int64
	Reference   string
	EventTime   time.Time
}

Represents an event to be published with the state serialized.

type Snapshot

type Snapshot struct {
	AggregateId int64
	State       string
	Sequence    int64
}

Represents a snapshot of an aggregate.

type SnapshotSlice

type SnapshotSlice []Snapshot

Slice of snapshots.

type StartFrom added in v0.0.42

type StartFrom struct {
	Kind      string
	EventID   int64
	Timestamp time.Time
}

StartFrom specifies where to start a subscription from.

type StateAggregate

type StateAggregate[T any] struct {
	Id       int64
	State    T
	Sequence int64
	// contains filtered or unexported fields
}

StateAggregate can be used for aggregates that contain simple state. It provides common aggregate functionality and handles state management.

HandleOtherEvents is an optional function that can be set to handle events that don't implement iStateEvent. When an event is applied that isn't a StateEvent, this function will be called if set. If not set, such events will return ErrEventStateIsNotAStateEvent.

Example usage:

aggregate.HandleOtherEvents = func(eventState EventState, eventTime time.Time, reference string) error {
    switch event := eventState.(type) {
    case SetActiveEvent:
        aggregate.State.Active = event.Active
        return nil
    default:
        return fmt.Errorf("unknown event type %s", event.GetEventType())
    }
}

func (*StateAggregate[T]) ApplyEventState

func (t *StateAggregate[T]) ApplyEventState(eventState EventState, eventTime time.Time, reference string) error

ApplyEventState applies an event state to the aggregate

func (*StateAggregate[T]) ApplySnapshot

func (t *StateAggregate[T]) ApplySnapshot(snapshot *Snapshot) error

ApplySnapshot applies a snapshot to the aggregate

func (*StateAggregate[T]) DecodeEvent

func (t *StateAggregate[T]) DecodeEvent(ev SerializedEvent) (EventState, error)

func (*StateAggregate[T]) GetAggregateType

func (t *StateAggregate[T]) GetAggregateType() string

GetAggregateType gets the aggregate type

func (*StateAggregate[T]) GetId

func (t *StateAggregate[T]) GetId() int64

GetId gets the aggregate id

func (*StateAggregate[T]) GetSequence

func (t *StateAggregate[T]) GetSequence() int64

Implement Aggregate interface for StateAggregate

func (*StateAggregate[T]) GetSnapshotFrequency

func (t *StateAggregate[T]) GetSnapshotFrequency() int64

GetSnapshotFrequency gets the snapshot frequency

func (*StateAggregate[T]) GetSnapshotState

func (t *StateAggregate[T]) GetSnapshotState() (*string, error)

GetSnapshotState gets the snapshot state

func (*StateAggregate[T]) SetId

func (t *StateAggregate[T]) SetId(id int64)

SetId sets the aggregate id

func (*StateAggregate[T]) SetSequence

func (t *StateAggregate[T]) SetSequence(seq int64)

SetSequence sets the aggregate sequence

type StateEvent

type StateEvent[T any] struct {
	State T
}

func NewStateEvent

func NewStateEvent[T any](state T) StateEvent[T]

func (*StateEvent[T]) Deserialize

func (ev *StateEvent[T]) Deserialize(serialized string) error

func (StateEvent[T]) GetEventType

func (ev StateEvent[T]) GetEventType() string

func (StateEvent[T]) GetState

func (ev StateEvent[T]) GetState() any

func (StateEvent[T]) Serialize

func (ev StateEvent[T]) Serialize() string

type StorageEngine

type StorageEngine interface {

	// Gets the maximum length of a natural key
	GetMaxKeyLength() int

	// Gets the id for the event type string.  It is added if it does not already exist.
	GetEventTypeId(tx StorageEngineTxInfo, ctx context.Context, typeName string) (int64, error)

	// Gets the id for the aggregate type string.  It is added if it does not already exist.
	GetAggregateTypeId(tx StorageEngineTxInfo, ctx context.Context, typeName string) (int64, error)

	// Adds a new aggregate of the specified type to the store and returns the id.
	NewAggregate(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64) (int64, error)

	// Adds a new aggregate of the specified type with the natural key to the store and returns the id.
	NewAggregateWithKey(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, natrualKey string) (int64, error)

	// Gets aggregate id and corresponding natural key that corresponds to the type and id.
	GetAggregateById(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, aggregateId int64) (int64, *string, error)

	// Gets the aggregate id corresponding to the type and natural key.
	GetAggregateByKey(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, naturalKey string) (int64, error)

	// Gets or creates an aggregate id for the given type and natural key.
	// If an aggregate with the given natural key already exists, returns false and its id.
	// Otherwise creates a new aggregate with the key and returns true and its new id.
	// Returns error if:
	// - The natural key exceeds maximum length (ErrorKeyExceedsMaximumLength)
	// - The aggregate type doesn't exist
	// - There are database errors
	GetOrCreateAggregateByKey(tx StorageEngineTxInfo, ctx context.Context, aggregateTypeId int64, naturalKey string) (bool, int64, error)

	// Changes the natural key of an aggregate.
	ChangeAggregateNaturalKey(tx StorageEngineTxInfo, ctx context.Context, aggregateId int64, naturalKey string) error

	// Loads all the aggregate types.
	GetAggregateTypes(tx StorageEngineTxInfo, ctx context.Context) ([]IdNamePair, error)

	// Loads all the event types.
	GetEventTypes(tx StorageEngineTxInfo, ctx context.Context) ([]IdNamePair, error)

	// Gets a snapshot (if any) for the specified aggregate.
	GetSnapshotForAggregate(tx StorageEngineTxInfo, ctx context.Context, aggregateId int64) (*Snapshot, error)

	// Retrieve events for an aggregate which are after a sequence.
	GetEventsForAggregate(tx StorageEngineTxInfo, ctx context.Context, aggregateId int64, afterSequence int64) ([]SerializedEvent, error)

	// Writes events to storage.
	WriteState(tx StorageEngineTxInfo, ctx context.Context, events []StorageEngineEvent, snapshot SnapshotSlice) error

	// Gets a transaction state to track multiple
	GetTransactionInfo() (StorageEngineTxInfo, error)

	// Closes the storage engine.
	Close() error

	// -------- Durable Subscriptions --------
	// Create or update a subscription by name. If updating, last_event_id is not modified.
	UpsertSubscription(tx StorageEngineTxInfo, ctx context.Context, name string, aggregateTypeId *int64, eventTypeId *int64, aggregateKey *string, startFrom string, startEventId int64, startTimestamp *time.Time) (int64, error)
	// Optionally associate multiple event types to a subscription (enables multi-type filtering).
	AddSubscriptionEventType(tx StorageEngineTxInfo, ctx context.Context, subscriptionId int64, eventTypeId int64) error
	// Fetch a subscription by name.
	GetSubscriptionByName(tx StorageEngineTxInfo, ctx context.Context, name string) (*Subscription, error)
	// Activate/deactivate a subscription.
	SetSubscriptionActive(tx StorageEngineTxInfo, ctx context.Context, id int64, active bool) error

	// Cooperative lease management for distributed runners.
	ClaimSubscription(tx StorageEngineTxInfo, ctx context.Context, name string, owner string, lease time.Duration) (bool, error)
	RenewSubscription(tx StorageEngineTxInfo, ctx context.Context, name string, owner string, lease time.Duration) (bool, error)
	ReleaseSubscription(tx StorageEngineTxInfo, ctx context.Context, name string, owner string) error

	// Event streaming for a subscription and cursor advancement.
	GetEventsForSubscription(tx StorageEngineTxInfo, ctx context.Context, sub *Subscription, limit int) ([]SerializedEvent, error)
	AdvanceSubscriptionCursor(tx StorageEngineTxInfo, ctx context.Context, id int64, lastEventId int64) error

	// Helpers for initializing cursors.
	GetMaxEventId(tx StorageEngineTxInfo, ctx context.Context) (int64, error)
	GetFirstEventIdFromTimestamp(tx StorageEngineTxInfo, ctx context.Context, ts time.Time) (int64, error)
}

A storage engine contains the raw methods for interacting with the event store database.

The storage engine implementations should implemented to be safe for calling from multiple goroutines simultaneously.

type StorageEngineError

type StorageEngineError struct {
	// The underlying error that triggered this error
	Err error
	// A human-readable message describing the error
	Message string
	// The type of error that occurred
	ErrorType string
}

StorageEngineError represents an error from the storage engine.

func NewStorageEngineError

func NewStorageEngineError(message string, err error) *StorageEngineError

NewStorageEngineError creates a new StorageEngineError.

func (*StorageEngineError) Error

func (e *StorageEngineError) Error() string

Error returns the string representation of the error.

func (*StorageEngineError) Unwrap

func (e *StorageEngineError) Unwrap() error

Unwrap returns the underlying error.

type StorageEngineEvent

type StorageEngineEvent struct {
	AggregateID int64
	Sequence    int64
	EventTypeID int64
	State       string
	EventTime   time.Time
	Reference   string
}

Represents an event mapping to the storage engine.

type StorageEngineTxInfo

type StorageEngineTxInfo interface {
	Commit() error
	Rollback() error
}

Interface represents transaction info needed to track the transaction.

type Subscription added in v0.0.42

type Subscription struct {
	ID              int64
	Name            string
	AggregateTypeID *int64
	EventTypeID     *int64
	AggregateKey    *string
	StartFrom       string
	StartEventID    int64
	StartTimestamp  *time.Time
	LastEventID     int64
	Active          bool
	LeaseOwner      *string
	LeaseExpiresAt  *time.Time
}

Subscription describes a durable reader over the event log with optional filters and a durable cursor stored as last_event_id.

type SubscriptionFilter added in v0.0.42

type SubscriptionFilter struct {
	AggregateType string   // optional; empty means all aggregates
	EventTypes    []string // optional; empty means all event types
	AggregateKey  *string  // optional
}

SubscriptionFilter controls which events are read by a subscription.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL