Documentation
¶
Overview ¶
Package store provides core event sourcing types and persistence interfaces.
This package defines the fundamental building blocks for event sourcing:
- Event types: Event (before persistence) and PersistedEvent (after persistence)
- Stream: Full history for a single aggregate
- Store interfaces: EventStore, EventReader, AggregateStreamReader
- Optimistic concurrency: ExpectedVersion with Any, NoStream, and Exact modes
- Observability: Logger interface for optional structured logging
The postgres package provides the PostgreSQL implementation of these interfaces.
Example usage:
store := postgres.NewStore(postgres.DefaultStoreConfig()) tx, _ := db.BeginTx(ctx, nil) result, err := store.Append(ctx, tx, store.NoStream(), events) tx.Commit()
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrOptimisticConcurrency indicates a version conflict during append. ErrOptimisticConcurrency = errors.New("optimistic concurrency conflict") // ErrNoEvents indicates an attempt to append zero events. ErrNoEvents = errors.New("no events to append") )
Functions ¶
This section is empty.
Types ¶
type AggregateStreamReader ¶
type AggregateStreamReader interface {
// ReadAggregateStream reads all events for a specific aggregate instance and returns
// them as a Stream containing the aggregate's full history.
// Events are ordered by aggregate_version ascending.
//
// Parameters:
// - aggregateType: the type of aggregate (e.g., "User", "Order")
// - aggregateID: the unique identifier of the aggregate instance (can be UUID string, email, etc.)
// - fromVersion: optional minimum version (inclusive). Pass nil to read from the beginning.
// - toVersion: optional maximum version (inclusive). Pass nil to read to the end.
//
// Examples:
// - ReadAggregateStream(ctx, tx, "User", "550e8400-e29b-41d4-a716-446655440000", nil, nil) - read all events
// - ReadAggregateStream(ctx, tx, "User", id, ptr(5), nil) - read from version 5 onwards
// - ReadAggregateStream(ctx, tx, "User", id, nil, ptr(10)) - read up to version 10
// - ReadAggregateStream(ctx, tx, "User", id, ptr(5), ptr(10)) - read versions 5-10
//
// Returns a Stream with an empty Events slice if no events match the criteria.
// Use stream.Version() to get the current aggregate version.
// Use stream.IsEmpty() to check if any events were found.
ReadAggregateStream(ctx context.Context, tx *sql.Tx, aggregateType string, aggregateID string, fromVersion, toVersion *int64) (Stream, error)
}
AggregateStreamReader defines the interface for reading events for a specific aggregate.
type AppendResult ¶
type AppendResult struct {
Events []PersistedEvent
GlobalPositions []int64
}
AppendResult represents the outcome of an Append operation. It contains only the events that were just committed, not the full history. AppendResult must never imply full history - use Stream for that purpose.
func (AppendResult) FromVersion ¶
func (r AppendResult) FromVersion() int64
FromVersion returns the aggregate version before the append. If no events were appended, returns 0. Otherwise, returns the version immediately before the first appended event.
func (AppendResult) ToVersion ¶
func (r AppendResult) ToVersion() int64
ToVersion returns the aggregate version after the append. If no events were appended, returns 0. Otherwise, returns the AggregateVersion of the last appended event.
type Event ¶
type Event struct {
CreatedAt time.Time
AggregateType string
EventType string
AggregateID string
Payload []byte
Metadata []byte
CausationID NullString
CorrelationID NullString
TraceID NullString
EventVersion int
EventID uuid.UUID
}
Event represents an immutable domain event before persistence. Events are value objects without identity until persisted. AggregateVersion and GlobalPosition are assigned by the store during Append.
type EventReader ¶
type EventReader interface {
// ReadEvents reads events starting from the given global position.
// Returns up to limit events ordered by global_position ascending.
//
// WARNING: global_position is BIGSERIAL-backed. PostgreSQL sequences guarantee
// uniqueness, not commit order. A lower position allocated by a concurrent
// transaction may become visible after a higher one has already been returned.
// Advancing a checkpoint to the highest seen position without accounting for
// in-flight gaps can permanently skip events. Async consumers must use a
// gap-aware runtime; do not treat the highest returned position as a safe
// checkpoint frontier under concurrent writers.
ReadEvents(ctx context.Context, tx *sql.Tx, fromPosition int64, limit int) ([]PersistedEvent, error)
}
EventReader defines the interface for reading events sequentially.
type EventStore ¶
type EventStore interface {
// Append atomically appends one or more events within the provided transaction.
// Events must all belong to the same aggregate instance.
// Returns an AppendResult containing the persisted events with assigned versions
// and their global positions, or an error.
//
// The expectedVersion parameter controls optimistic concurrency:
// - Any(): No version check - always succeeds if no other errors
// - NoStream(): Aggregate must not exist - used for aggregate creation
// - Exact(N): Aggregate must be at version N - used for normal updates
//
// The store automatically assigns AggregateVersion to each event:
// - Fetches the current version from the aggregate_heads table (O(1) lookup)
// - Validates against expectedVersion
// - Assigns consecutive versions starting from (current + 1)
// - Updates aggregate_heads with the new version
// - The database unique constraint on (aggregate_type, aggregate_id, aggregate_version)
// enforces optimistic concurrency as a last safety net
//
// Returns ErrOptimisticConcurrency if expectedVersion validation fails or if
// another transaction commits conflicting events between the version check and insert
// (detected via unique constraint violation).
// Returns ErrNoEvents if events slice is empty.
//
// After a successful append:
// - Use result.ToVersion() to get the new aggregate version
// - Use result.Events to access the persisted events with all fields populated
// - Use result.GlobalPositions to get the assigned global positions
Append(ctx context.Context, tx *sql.Tx, expectedVersion ExpectedVersion, events []Event) (AppendResult, error)
}
EventStore defines the interface for appending events.
type ExpectedVersion ¶
type ExpectedVersion struct {
// contains filtered or unexported fields
}
ExpectedVersion represents the expected aggregate version for optimistic concurrency control. It is used in the Append operation to declare expectations about the current state of an aggregate.
func Any ¶
func Any() ExpectedVersion
Any returns an ExpectedVersion that skips version validation. Use this when you don't need optimistic concurrency control.
func Exact ¶
func Exact(version int64) ExpectedVersion
Exact returns an ExpectedVersion that enforces the aggregate must be at exactly the specified version. Use this for normal command handling with optimistic concurrency control. The version must be non-negative (>= 0). Note that Exact(0) is equivalent to NoStream().
func NoStream ¶
func NoStream() ExpectedVersion
NoStream returns an ExpectedVersion that enforces the aggregate must not exist. Use this when creating a new aggregate to ensure it doesn't already exist. This is useful for enforcing uniqueness constraints via reservation aggregates.
func (ExpectedVersion) IsAny ¶
func (ev ExpectedVersion) IsAny() bool
IsAny returns true if this is an "Any" expected version (no version check).
func (ExpectedVersion) IsExact ¶
func (ev ExpectedVersion) IsExact() bool
IsExact returns true if this is an "Exact" expected version (aggregate must be at specific version).
func (ExpectedVersion) IsNoStream ¶
func (ev ExpectedVersion) IsNoStream() bool
IsNoStream returns true if this is a "NoStream" expected version (aggregate must not exist).
func (ExpectedVersion) String ¶
func (ev ExpectedVersion) String() string
String returns a string representation of the ExpectedVersion.
func (ExpectedVersion) Value ¶
func (ev ExpectedVersion) Value() int64
Value returns the exact version number if this is an Exact expected version. Returns 0 for Any and NoStream.
type GlobalPositionReader ¶
type GlobalPositionReader interface {
// GetLatestGlobalPosition returns the highest global_position currently present in the event log.
// Returns 0 when no events exist.
//
// WARNING: Because global_position is BIGSERIAL-backed, the returned value is not a safe
// checkpoint frontier under concurrent writers. A concurrent transaction holding a lower
// position may commit after this call returns, making that position invisible to any
// consumer that has already advanced its checkpoint past it.
GetLatestGlobalPosition(ctx context.Context, tx *sql.Tx) (int64, error)
}
GlobalPositionReader defines the interface for reading the latest global event position. This is useful for lightweight "new events available" checks without loading full batches.
type Logger ¶
type Logger interface {
// Debug logs debug-level information for detailed troubleshooting.
// Typically used for verbose operational details.
Debug(ctx context.Context, msg string, keyvals ...interface{})
// Info logs informational messages about normal operations.
// Used to track significant events during normal execution.
Info(ctx context.Context, msg string, keyvals ...interface{})
// Error logs error-level information about failures.
// Used to track errors that require attention.
Error(ctx context.Context, msg string, keyvals ...interface{})
}
Logger provides a minimal interface for observability and debugging. It is designed to be optional and non-blocking, with zero overhead when disabled. Users can implement this interface to integrate their preferred logging library.
type NoOpLogger ¶
type NoOpLogger struct{}
NoOpLogger is a logger that does nothing. It can be used as a default when no logging is desired.
func (NoOpLogger) Debug ¶
func (NoOpLogger) Debug(_ context.Context, _ string, _ ...interface{})
Debug implements Logger.
type NullString ¶
NullString represents a string that may be null. It implements database/sql Scanner and Valuer interfaces for SQL interop, but avoids direct dependency on sql.NullString in public types.
func (*NullString) Scan ¶
func (ns *NullString) Scan(value interface{}) error
Scan implements the sql.Scanner interface.
type PersistedEvent ¶
type PersistedEvent struct {
CreatedAt time.Time
AggregateType string
EventType string
AggregateID string
CausationID NullString
Metadata []byte
Payload []byte
CorrelationID NullString
TraceID NullString
GlobalPosition int64
AggregateVersion int64
EventVersion int
EventID uuid.UUID
}
PersistedEvent represents an event that has been stored. It includes the GlobalPosition and AggregateVersion assigned by the event store.
type Stream ¶
type Stream struct {
AggregateType string
AggregateID string
Events []PersistedEvent
}
Stream represents the full historical event stream for a single aggregate. It is immutable after creation and is returned from read operations. Stream must never be returned from Append operations.
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
eventmap-gen
command
Command eventmap-gen generates mapping code between domain events and event sourcing types.
|
Command eventmap-gen generates mapping code between domain events and event sourcing types. |
|
migrate-gen
command
Command migrate-gen generates SQL migration files for event sourcing.
|
Command migrate-gen generates SQL migration files for event sourcing. |
|
Package consumer provides event consumer interface definitions.
|
Package consumer provides event consumer interface definitions. |
|
Package eventmap provides code generation for mapping between domain events and eventsalsa event sourcing types (store.Event and store.PersistedEvent).
|
Package eventmap provides code generation for mapping between domain events and eventsalsa event sourcing types (store.Event and store.PersistedEvent). |
|
examples
|
|
|
basic
command
Package main demonstrates basic usage of the eventsalsa event store.
|
Package main demonstrates basic usage of the eventsalsa event store. |
|
eventmap-codegen
command
|
|
|
Package migrations provides SQL migration generation.
|
Package migrations provides SQL migration generation. |
|
Package postgres provides a PostgreSQL implementation for the event store.
|
Package postgres provides a PostgreSQL implementation for the event store. |