Documentation
¶
Overview ¶
Package es provides core event sourcing infrastructure.
Overview ¶
This package defines the fundamental types and interfaces for event sourcing:
- Event: immutable domain events
- DBTX: database transaction abstraction
- EventStore: event persistence interface
- Projection: event processing interface
Design Philosophy ¶
Clean Architecture: Core interfaces are database-agnostic. Infrastructure concerns (like PostgreSQL) are isolated in adapter packages.
Transaction Control: The library uses DBTX instead of managing transactions. This gives you full control over transaction boundaries and allows combining event operations with other database work atomically.
Immutability: Events are value objects. They don't have identity until persisted and assigned a global_position by the event store.
Quick Start ¶
1. Generate database migrations:
go run github.com/getpup/pupsourcing/cmd/migrate-gen -output migrations
2. Apply migrations to your database
3. Create an event store:
import (
"github.com/getpup/pupsourcing/es"
"github.com/getpup/pupsourcing/es/adapters/postgres"
)
store := postgres.NewStore(postgres.DefaultStoreConfig())
4. Append events:
tx, _ := db.BeginTx(ctx, nil)
defer tx.Rollback()
events := []es.Event{
{
AggregateType: "Order",
AggregateID: orderID,
EventID: uuid.New(),
EventType: "OrderCreated",
EventVersion: 1,
Payload: payload,
Metadata: []byte(`{}`),
CreatedAt: time.Now(),
},
}
result, err := store.Append(ctx, tx, es.NoStream(), events)
if err != nil {
return err
}
tx.Commit()
5. Process events with projections:
import "github.com/getpup/pupsourcing/es/projection"
type MyProjection struct {}
func (p *MyProjection) Name() string { return "my_projection" }
func (p *MyProjection) Handle(ctx context.Context, tx es.DBTX, event es.PersistedEvent) error {
// Process event
return nil
}
config := projection.DefaultProcessorConfig()
processor := projection.NewProcessor(db, store, &config)
processor.Run(ctx, &MyProjection{})
Optimistic Concurrency ¶
The library enforces optimistic concurrency via aggregate_version. When appending events:
- The first event's version must be current_version + 1
- Subsequent events must have sequential versions
- Version conflicts return ErrOptimisticConcurrency
This prevents race conditions when multiple processes modify the same aggregate.
Projections ¶
Projections process events sequentially and track their progress via checkpoints. They can be:
- Long-running (endless processing with context cancellation)
- Horizontally scaled (via deterministic hash partitioning)
- Resumed after failure (from last checkpoint)
See the projection package for details.
Database Schema ¶
Events are stored in a table with:
- global_position: BIGSERIAL primary key for global ordering
- aggregate_type, aggregate_id: identify the aggregate
- aggregate_version: for optimistic concurrency
- event_id: unique identifier (UUID)
- payload: BYTEA for flexible serialization
- trace_id, correlation_id, causation_id: for distributed tracing
- metadata: JSONB for additional context
Checkpoints are stored separately per projection.
Design Decisions ¶
BYTEA for payload: Supports any serialization (JSON, Protobuf, Avro). Users choose their encoding.
DBTX interface: Works with *sql.DB and *sql.Tx. No transaction management in the library keeps it focused on event sourcing.
Pull-based projections: Projections read events in batches. This is simpler than push-based and works well with checkpoint-based resumption.
Hash-based partitioning: Events for the same aggregate go to the same partition. This maintains ordering while enabling horizontal scaling.
Package es provides core event sourcing interfaces and types.
Package es provides core event sourcing interfaces and types.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AppendResult ¶
type AppendResult struct {
Events []PersistedEvent
GlobalPositions []int64
}
AppendResult represents the outcome of an Append operation. It contains only the events that were just committed, not the full history. AppendResult must never imply full history - use Stream for that purpose.
func (AppendResult) FromVersion ¶
func (r AppendResult) FromVersion() int64
FromVersion returns the aggregate version before the append. If no events were appended, returns 0. Otherwise, returns the version immediately before the first appended event.
func (AppendResult) ToVersion ¶
func (r AppendResult) ToVersion() int64
ToVersion returns the aggregate version after the append. If no events were appended, returns 0. Otherwise, returns the AggregateVersion of the last appended event.
type DBTX ¶
type DBTX interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}
DBTX is a minimal interface for database operations. It is implemented by both *sql.DB and *sql.Tx, allowing the library to be transaction-agnostic.
This design gives callers full control over transaction boundaries while keeping the library focused on event sourcing concerns.
type Event ¶
type Event struct {
CreatedAt time.Time
BoundedContext string
AggregateType string
EventType string
AggregateID string
Payload []byte
Metadata []byte
CausationID NullString
CorrelationID NullString
TraceID NullString
EventVersion int
EventID uuid.UUID
}
Event represents an immutable domain event before persistence. Events are value objects without identity until persisted. AggregateVersion and GlobalPosition are assigned by the store during Append.
type ExpectedVersion ¶
type ExpectedVersion struct {
// contains filtered or unexported fields
}
ExpectedVersion represents the expected aggregate version for optimistic concurrency control. It is used in the Append operation to declare expectations about the current state of an aggregate.
func Any ¶
func Any() ExpectedVersion
Any returns an ExpectedVersion that skips version validation. Use this when you don't need optimistic concurrency control.
func Exact ¶
func Exact(version int64) ExpectedVersion
Exact returns an ExpectedVersion that enforces the aggregate must be at exactly the specified version. Use this for normal command handling with optimistic concurrency control. The version must be non-negative (>= 0). Note that Exact(0) is equivalent to NoStream().
func NoStream ¶
func NoStream() ExpectedVersion
NoStream returns an ExpectedVersion that enforces the aggregate must not exist. Use this when creating a new aggregate to ensure it doesn't already exist. This is useful for enforcing uniqueness constraints via reservation aggregates.
func (ExpectedVersion) IsAny ¶
func (ev ExpectedVersion) IsAny() bool
IsAny returns true if this is an "Any" expected version (no version check).
func (ExpectedVersion) IsExact ¶
func (ev ExpectedVersion) IsExact() bool
IsExact returns true if this is an "Exact" expected version (aggregate must be at specific version).
func (ExpectedVersion) IsNoStream ¶
func (ev ExpectedVersion) IsNoStream() bool
IsNoStream returns true if this is a "NoStream" expected version (aggregate must not exist).
func (ExpectedVersion) String ¶
func (ev ExpectedVersion) String() string
String returns a string representation of the ExpectedVersion.
func (ExpectedVersion) Value ¶
func (ev ExpectedVersion) Value() int64
Value returns the exact version number if this is an Exact expected version. Returns 0 for Any and NoStream.
type Logger ¶
type Logger interface {
// Debug logs debug-level information for detailed troubleshooting.
// Typically used for verbose operational details.
Debug(ctx context.Context, msg string, keyvals ...interface{})
// Info logs informational messages about normal operations.
// Used to track significant events during normal execution.
Info(ctx context.Context, msg string, keyvals ...interface{})
// Error logs error-level information about failures.
// Used to track errors that require attention.
Error(ctx context.Context, msg string, keyvals ...interface{})
}
Logger provides a minimal interface for observability and debugging. It is designed to be optional and non-blocking, with zero overhead when disabled. Users can implement this interface to integrate their preferred logging library.
type NoOpLogger ¶
type NoOpLogger struct{}
NoOpLogger is a logger that does nothing. It can be used as a default when no logging is desired.
func (NoOpLogger) Debug ¶
func (NoOpLogger) Debug(_ context.Context, _ string, _ ...interface{})
Debug implements Logger.
type NullString ¶
NullString represents a string that may be null. It implements database/sql Scanner and Valuer interfaces for SQL interop, but avoids direct dependency on sql.NullString in public types.
func (*NullString) Scan ¶
func (ns *NullString) Scan(value interface{}) error
Scan implements the sql.Scanner interface.
type PersistedEvent ¶
type PersistedEvent struct {
CreatedAt time.Time
BoundedContext string
AggregateType string
EventType string
AggregateID string
CausationID NullString
Metadata []byte
Payload []byte
CorrelationID NullString
TraceID NullString
GlobalPosition int64
AggregateVersion int64
EventVersion int
EventID uuid.UUID
}
PersistedEvent represents an event that has been stored. It includes the GlobalPosition and AggregateVersion assigned by the event store.
type Stream ¶
type Stream struct {
BoundedContext string
AggregateType string
AggregateID string
Events []PersistedEvent
}
Stream represents the full historical event stream for a single aggregate. It is immutable after creation and is returned from read operations. Stream must never be returned from Append operations.
Directories
¶
| Path | Synopsis |
|---|---|
|
adapters
|
|
|
mysql
Package mysql provides a MySQL/MariaDB adapter for event sourcing.
|
Package mysql provides a MySQL/MariaDB adapter for event sourcing. |
|
postgres
Package postgres provides a PostgreSQL adapter for event sourcing.
|
Package postgres provides a PostgreSQL adapter for event sourcing. |
|
sqlite
Package sqlite provides a SQLite adapter for event sourcing.
|
Package sqlite provides a SQLite adapter for event sourcing. |
|
Package eventmap provides code generation for mapping between domain events and pupsourcing event sourcing types (es.Event and es.PersistedEvent).
|
Package eventmap provides code generation for mapping between domain events and pupsourcing event sourcing types (es.Event and es.PersistedEvent). |
|
Package migrations provides SQL migration generation.
|
Package migrations provides SQL migration generation. |
|
Package projection provides projection processing capabilities.
|
Package projection provides projection processing capabilities. |
|
runner
Package runner provides optional tooling for running multiple projections and scaling them safely.
|
Package runner provides optional tooling for running multiple projections and scaling them safely. |
|
Package store provides event store abstractions and implementations.
|
Package store provides event store abstractions and implementations. |