Documentation
¶
Overview ¶
Package es provides an event sourcing framework for building event-driven applications.
Overview ¶
Event sourcing is a pattern where application state is stored as a sequence of events rather than as current state snapshots. This package provides the core abstractions and implementations for building event-sourced systems in Go.
Core Components ¶
The package provides several key components:
Aggregate: The domain object that encapsulates business logic and state changes. Events are raised within aggregates and applied to update internal state. Use BaseAggregate as an embeddable helper that tracks version and uncommitted events.
type User struct {
es.BaseAggregate
Name string
Email string
}
func (u *User) ChangeName(name string) error {
return es.RaiseAndApply(u, &NameChanged{Name: name})
}
EventStore: The persistence layer for events. It provides [EventStore.Load] to retrieve events for an aggregate and [EventStore.Append] to persist new events with optimistic concurrency control. Use NewInMemoryStore for testing or implement the interface for production storage (e.g., NATS JetStream via the adapters/nats package).
Repository: The application-level interface for working with aggregates. It handles loading aggregates by replaying events and saving new events. Use NewTypedRepository for type-safe operations with generics:
repo := es.NewTypedRepository[*User](log, store, registry)
user, err := repo.GetByID(ctx, "user-123")
user.ChangeName("New Name")
repo.Save(ctx, user)
Consumer: Processes events from the store for building read models or triggering side effects. Supports checkpointing for exactly-once semantics and live mode detection to distinguish historical replay from real-time events:
consumer := es.NewConsumer(store, registry, handler,
es.WithConsumerName("user-projector"),
es.WithMiddlewares(es.NewCheckpointMiddleware(checkpointStore)),
)
consumer.Start(ctx)
Event Registration ¶
Events must be registered with an EventRegistry before they can be decoded:
registry := es.NewEventRegistry() registry.Register(es.Event[NameChanged]()) registry.Register(es.Event[EmailChanged]())
Snapshots ¶
For aggregates with many events, snapshots can optimize loading by capturing state at a point in time. Implement Snapshottable for custom serialization, or let the framework use JSON marshaling as a fallback:
// Load with snapshot optimization user, err := repo.GetByID(ctx, "user-123", es.WithSnapshot(true)) // Save with snapshot repo.Save(ctx, user, es.WithSnapshot(true))
Concurrency Control ¶
The framework uses optimistic concurrency via the Version type. When saving, the repository checks that the aggregate's version matches the store's version. If another process has modified the aggregate, ErrConcurrencyConflict is returned.
For serialized access to a single aggregate, use [TypedRepository.WithTransaction]:
repo.WithTransaction(ctx, "user-123", func(user *User) error {
return user.ChangeName("New Name")
})
Environment ¶
The [Environment] type provides a factory for creating configured instances of stores, repositories, and consumers with shared configuration:
env := es.NewEnvironment(
es.WithLog(logger),
es.WithStore(natsStore),
es.WithEvent[NameChanged](),
es.WithAggregates(&User{}),
)
repo := env.Repository()
consumer := env.NewConsumer(handler)
Index ¶
- Variables
- func ApplySnapshot(ctx context.Context, snapshotter Snapshotter, agg Aggregate) (err error)
- func Event[T any]() func() any
- func RaiseAndApply(a raiseApplier, events ...any) (err error)
- func RaiseAndApplyD(a Aggregate, events ...any) func() error
- func RegisterEventFor[T any](r Registrar)
- func RegisterEvents(r Registrar, ctors ...func() any)
- type AggCpStore
- type AggCpStoreOption
- type Aggregate
- type AggregateCreatedEvent
- type AggregateDeleted
- type AggregateOption
- type Applier
- type BaseAggregate
- func (b *BaseAggregate) Apply(evt any) error
- func (b *BaseAggregate) Checked(c assert.Cond, thenFunc func() error) error
- func (b *BaseAggregate) ClearUncommitted()
- func (b *BaseAggregate) Create(id string) error
- func (b *BaseAggregate) GetCreatedAt() time.Time
- func (b *BaseAggregate) GetID() string
- func (b *BaseAggregate) GetSeq() uint64
- func (b *BaseAggregate) GetVersion() Version
- func (b *BaseAggregate) IsCreated() bool
- func (b *BaseAggregate) Raise(event any)
- func (b *BaseAggregate) SetID(id string)
- func (b *BaseAggregate) Uncommitted() []any
- type Checkpoint
- type Consumer
- type ConsumerNameOption
- type ConsumerOption
- type ConsumerOptions
- type ConsumerShutdownTimeoutOption
- type ContextOption
- type CpStore
- type CpStoreOption
- type Decoder
- type DeliverPolicy
- type ESMetrics
- type ESMetricsOption
- type Env
- func (e *Env) Append(ctx context.Context, aggType string, aggID string, expect Version, ...) error
- func (e *Env) NewConsumer(handler Handler, opts ...ConsumerOption) *Consumer
- func (e *Env) Repository() Repository
- func (e *Env) Shutdown()
- func (e *Env) Snapshotter() Snapshotter
- func (e *Env) Start() (err error)
- func (e *Env) Store() EventStore
- type EnvConsumerOption
- type EnvOption
- type EnvOpts
- type Envelope
- type EventRegisterOption
- type EventRegistry
- type EventStore
- type HandleFunc
- type Handler
- type HandlerLifecycleShutdown
- type HandlerLifecycleStart
- type HandlerMiddleware
- type IDGenerator
- type InMemAggCpStore
- type InMemCpStore
- type InMemoryStore
- func (s *InMemoryStore) Append(_ context.Context, aggType string, aggID string, expectVersion Version, ...) (*StoreAppendResult, error)
- func (s *InMemoryStore) Load(_ context.Context, aggType, aggID string, opts ...StoreLoadOption) ([]Envelope, error)
- func (s *InMemoryStore) Subscribe(ctx context.Context, opts ...SubscribeOption) (Subscription, error)
- type KeyValueSnapshotter
- type LoadAndSaveOption
- type LoadOption
- type LoadOptsOption
- type LogOption
- type MemoryOption
- type MiddlewareHandleFunc
- type MiddlewareOption
- type MsgCtx
- func (c *MsgCtx) AggregateID() string
- func (c *MsgCtx) AggregateType() string
- func (c *MsgCtx) Context() context.Context
- func (c *MsgCtx) Data() json.RawMessage
- func (c *MsgCtx) Envelope() Envelope
- func (c *MsgCtx) Event() any
- func (c *MsgCtx) Live() bool
- func (c *MsgCtx) Log() *slog.Logger
- func (c *MsgCtx) OccurredAt() time.Time
- func (c *MsgCtx) Seq() uint64
- func (c *MsgCtx) Type() string
- func (c *MsgCtx) Version() Version
- type MultiOption
- type NoopAggCpStore
- type Projection
- type Registrar
- type RepoCacheOption
- type RepoCreateOption
- type RepoIDGeneratorOption
- type RepoUseCacheOption
- type Repository
- type RepositoryOption
- type SaveOption
- type SaveOptsOption
- type Snapshot
- type SnapshotOption
- type SnapshotProjection
- func (p *SnapshotProjection[T]) GetLastSeq() (uint64, error)
- func (p *SnapshotProjection[T]) Handle(msgCtx MsgCtx) error
- func (p *SnapshotProjection[T]) Name() string
- func (p *SnapshotProjection[T]) Projection() T
- func (p *SnapshotProjection[T]) Shutdown(ctx context.Context) error
- func (p *SnapshotProjection[T]) Start(ctx context.Context) error
- type SnapshotProjectionOption
- type SnapshotSaveOpts
- type SnapshotTTLOption
- type Snapshottable
- type SnapshottableProjection
- type Snapshotter
- type SnapshotterOption
- type StartSeqOption
- type StoreAppendResult
- type StoreLoadOption
- type StoreOption
- type Stream
- type SubscribeFilter
- type SubscribeOption
- type SubscribeOpts
- type Subscription
- type TestingEnv
- type TestingEnvAssert
- type TypedRepository
- type Version
- type WithTransactionOption
Constants ¶
This section is empty.
Variables ¶
var ( ErrAggregateNotFound = errors.New("aggregate not found") ErrConcurrencyConflict = errors.New("concurrency conflict") ErrUnknownEventType = errors.New("unknown event type") )
var ( ErrSnapshotterUnconfigured = errors.New("no snapshotter configured") ErrSnapshotNotFound = errors.New("snapshot not found") )
var (
ErrCheckpointNotFound = errors.New("checkpoint not found")
)
var (
ErrStoreNoEvents = errors.New("no events to store")
)
Functions ¶
func ApplySnapshot ¶
func ApplySnapshot(ctx context.Context, snapshotter Snapshotter, agg Aggregate) (err error)
func Event ¶
Event returns a reflection-free constructor for an event of type T. Each call to the returned function constructs a fresh *T via new(T).
func RaiseAndApply ¶
RaiseAndApply records e as uncommitted and applies it to mutate state.
func RaiseAndApplyD ¶
RaiseAndApplyD returns a deferred function that calls RaiseAndApply. This is useful with BaseAggregate.Checked for conditional event application:
return b.Checked(
assert.NotEmpty(user.Email, "email required"),
es.RaiseAndApplyD(user, &EmailChanged{Email: newEmail}),
)
func RegisterEventFor ¶
func RegisterEvents ¶
RegisterEvents registers event constructors. It does not use reflection to create instances. For each provided constructor, we call it once to determine the event type name and then register the original constructor so future decodes produce fresh instances per call.
Types ¶
type AggCpStore ¶
type AggCpStore interface {
Get(ctx context.Context, projectionName, aggKey string) (lastVersion Version, err error)
Set(ctx context.Context, projectionName, aggKey string, lastVersion Version) error
}
func NewNoopCpStore ¶
func NewNoopCpStore() AggCpStore
type AggCpStoreOption ¶
type AggCpStoreOption valueOption[AggCpStore]
func WithAggregateCheckpointStore ¶
func WithAggregateCheckpointStore(cps AggCpStore) AggCpStoreOption
type Aggregate ¶
type Aggregate interface {
// GetAggType returns the aggregate type name used for stream identification.
GetAggType() string
// GetID returns the unique identifier of this aggregate instance.
GetID() string
// SetID sets the aggregate ID. Typically called during creation.
SetID(string)
// GetVersion returns the current version (number of events applied).
GetVersion() Version
// GetSeq returns the global stream sequence of the last applied event.
GetSeq() uint64
// Create initializes a new aggregate with the given ID.
Create(id string) error
// Register registers event types with the provided Registrar.
Register(r Registrar)
// Raise records an event as uncommitted without applying it.
Raise(event any)
// Apply updates the aggregate state from an event.
Apply(event any) error
// Uncommitted returns a copy of events raised but not yet persisted.
Uncommitted() []any
// ClearUncommitted removes all uncommitted events after successful save.
ClearUncommitted()
// contains filtered or unexported methods
}
Aggregate is the core interface for event-sourced domain objects. It defines the contract that all aggregate roots must implement to work with the Repository for loading and persisting state through events.
An aggregate maintains:
- Identity: type and ID that uniquely identify the aggregate stream
- Version: the current version for optimistic concurrency control
- Sequence: the global stream sequence number of the last applied event
- Uncommitted events: events raised but not yet persisted
The typical lifecycle is:
- Create a new aggregate or load an existing one via Repository
- Execute domain logic that calls Raise() to record events
- Apply() is called to update internal state from each event
- Save via Repository which persists uncommitted events and calls ClearUncommitted()
type AggregateCreatedEvent ¶
func (AggregateCreatedEvent) Validate ¶
func (e AggregateCreatedEvent) Validate() error
type AggregateDeleted ¶
type AggregateDeleted struct{}
AggregateDeleted is an event that marks an aggregate as deleted. Use this as a soft-delete marker that can be detected by projections.
type AggregateOption ¶
type AggregateOption struct {
// contains filtered or unexported fields
}
func WithAggregates ¶
func WithAggregates(a ...Aggregate) AggregateOption
type BaseAggregate ¶
type BaseAggregate struct {
CreatedAt time.Time `json:"created_at"`
// contains filtered or unexported fields
}
BaseAggregate is an embeddable helper that tracks version + uncommitted events.
func (*BaseAggregate) Apply ¶
func (b *BaseAggregate) Apply(evt any) error
func (*BaseAggregate) Checked ¶
func (b *BaseAggregate) Checked(c assert.Cond, thenFunc func() error) error
func (*BaseAggregate) ClearUncommitted ¶
func (b *BaseAggregate) ClearUncommitted()
func (*BaseAggregate) Create ¶
func (b *BaseAggregate) Create(id string) error
func (*BaseAggregate) GetCreatedAt ¶
func (b *BaseAggregate) GetCreatedAt() time.Time
func (*BaseAggregate) GetID ¶
func (b *BaseAggregate) GetID() string
func (*BaseAggregate) GetSeq ¶
func (b *BaseAggregate) GetSeq() uint64
func (*BaseAggregate) GetVersion ¶
func (b *BaseAggregate) GetVersion() Version
func (*BaseAggregate) IsCreated ¶
func (b *BaseAggregate) IsCreated() bool
func (*BaseAggregate) Raise ¶
func (b *BaseAggregate) Raise(event any)
Raise records an event as uncommitted. (Typically you call Raise+Apply together via a helper like ApplyNew below.)
func (*BaseAggregate) SetID ¶
func (b *BaseAggregate) SetID(id string)
func (*BaseAggregate) Uncommitted ¶
func (b *BaseAggregate) Uncommitted() []any
type Checkpoint ¶
type Checkpoint interface {
// GetLastSeq returns the sequence number of the last successfully processed event.
// Returns ErrCheckpointNotFound if no checkpoint exists.
GetLastSeq() (uint64, error)
}
Checkpoint is implemented by handlers that track their processing progress. When a handler implements this interface, the Consumer will use GetLastSeq() to determine where to resume processing after a restart.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer processes events from an EventStore by subscribing to the stream and dispatching events to a Handler. It supports checkpointing for exactly-once processing semantics and live mode detection for distinguishing between historical replay and real-time events.
func NewConsumer ¶
func NewConsumer( store EventStore, decoder Decoder, handler Handler, opts ...ConsumerOption, ) *Consumer
type ConsumerNameOption ¶
type ConsumerNameOption valueOption[string]
func WithConsumerName ¶
func WithConsumerName(name string) ConsumerNameOption
type ConsumerOption ¶
type ConsumerOption interface {
// contains filtered or unexported methods
}
type ConsumerOptions ¶
type ConsumerOptions MultiOption[ConsumerOption]
func WithConsumerOpts ¶
func WithConsumerOpts(opts ...ConsumerOption) ConsumerOptions
type ConsumerShutdownTimeoutOption ¶
func WithShutdownTimeout ¶
func WithShutdownTimeout(d time.Duration) ConsumerShutdownTimeoutOption
WithShutdownTimeout sets the timeout for handler shutdown when the consumer stops. Default is 5 seconds.
type ContextOption ¶
type ContextOption struct {
// contains filtered or unexported fields
}
type CpStoreOption ¶
type CpStoreOption valueOption[CpStore]
func WithCheckpointStore ¶
func WithCheckpointStore(cps CpStore) CpStoreOption
type DeliverPolicy ¶
type DeliverPolicy string
const ( DeliverAllPolicy DeliverPolicy = "all" DeliverNewPolicy DeliverPolicy = "new" )
type ESMetrics ¶
type ESMetrics interface {
// Store operations
StoreLoadDuration(aggType string) metrics.Timer
StoreAppendDuration(aggType string) metrics.Timer
EventsAppended(aggType string, count int)
// Repository operations
RepoLoadDuration(aggType string) metrics.Timer
RepoSaveDuration(aggType string) metrics.Timer
ConcurrencyConflict(aggType string)
// Cache
CacheHit(aggType string)
CacheMiss(aggType string)
// Snapshots
SnapshotLoadDuration(aggType string) metrics.Timer
SnapshotSaveDuration(aggType string) metrics.Timer
// Consumer
ConsumerEventDuration(eventType string, live bool) metrics.Timer
ConsumerEventProcessed(eventType string, live bool, success bool)
ConsumerLag(consumer string, lag int64)
}
ESMetrics defines the metrics interface for the Event Sourcing pillar. All methods return Timer or increment counters; implementations should be thread-safe.
func NopESMetrics ¶
func NopESMetrics() ESMetrics
NopESMetrics returns a no-op ESMetrics implementation.
type ESMetricsOption ¶
type ESMetricsOption struct {
// contains filtered or unexported fields
}
ESMetricsOption sets the metrics for ES components.
func WithMetrics ¶
func WithMetrics(m ESMetrics) ESMetricsOption
WithMetrics sets the metrics implementation for ES components.
type Env ¶
type Env struct {
// contains filtered or unexported fields
}
func (*Env) NewConsumer ¶
func (e *Env) NewConsumer(handler Handler, opts ...ConsumerOption) *Consumer
func (*Env) Repository ¶
func (e *Env) Repository() Repository
func (*Env) Snapshotter ¶
func (e *Env) Snapshotter() Snapshotter
func (*Env) Store ¶
func (e *Env) Store() EventStore
type EnvConsumerOption ¶
type EnvConsumerOption struct {
// contains filtered or unexported fields
}
func WithConsumer ¶
func WithConsumer(handler Handler, opts ...ConsumerOption) EnvConsumerOption
func WithProjection ¶
func WithProjection(projection Projection, opts ...ConsumerOption) EnvConsumerOption
type EnvOpts ¶
type EnvOpts MultiOption[EnvOption]
func WithEnvOpts ¶
func WithEnvOpts(opts ...EnvOption) EnvOpts
type Envelope ¶
type Envelope struct {
// ID is the unique identifier of this event envelope.
ID string `json:"id"`
// Seq is the global sequence number assigned by the store.
// This provides total ordering across all events in the store.
Seq uint64 `json:"seq"`
// Version is the per-aggregate stream version (1, 2, 3, ...).
// Used for optimistic concurrency control.
Version Version `json:"version"`
// AggregateType identifies the type of aggregate this event belongs to.
AggregateType string `json:"aggregate"`
// AggregateID identifies the specific aggregate instance.
AggregateID string `json:"aggregate_id"`
// Type is the event type name for deserialization routing.
Type string `json:"type"`
// OccurredAt is when the event was created.
OccurredAt time.Time `json:"occurred_at"`
// Data contains the JSON-encoded event payload.
Data json.RawMessage `json:"data"`
}
Envelope wraps an event with metadata for persistence and routing. It is the unit of storage in the EventStore and contains all information needed to reconstruct and route events during replay or consumption.
type EventRegisterOption ¶
type EventRegisterOption struct {
// contains filtered or unexported fields
}
type EventRegistry ¶
type EventRegistry struct {
// contains filtered or unexported fields
}
EventRegistry maps event type names to constructors so we can decode persisted events.
func NewRegistry ¶
func NewRegistry() *EventRegistry
func (*EventRegistry) Register ¶
func (r *EventRegistry) Register(eventType string, ctor func() any)
type EventStore ¶
type EventStore interface {
// Stream provides subscription capabilities for consumers.
Stream
// Load retrieves events for a specific aggregate stream.
// Events are returned in version order. Use opts to filter by version/sequence.
Load(ctx context.Context, aggType string, aggID string, opts ...StoreLoadOption) ([]Envelope, error)
// Append persists events to an aggregate stream with optimistic concurrency.
// Returns ErrConcurrencyConflict if expectedVersion doesn't match the current version.
Append(ctx context.Context, aggType string, aggID string, expectedVersion Version, events []Envelope) (*StoreAppendResult, error)
}
EventStore is the persistence interface for event sourcing. It provides operations for loading and appending events to aggregate streams, as well as subscribing to the global event stream for consumers.
Implementations must guarantee:
- Atomic append of multiple events within a single Append call
- Optimistic concurrency via expectedVersion check
- Ordered delivery of events by sequence number in subscriptions
type HandleFunc ¶
func Handle ¶
func Handle(f HandleFunc) HandleFunc
func (HandleFunc) Handle ¶
func (f HandleFunc) Handle(ctx MsgCtx) error
type HandlerLifecycleStart ¶
type HandlerMiddleware ¶
func MiddlewareHandle ¶
func MiddlewareHandle(mw MiddlewareHandleFunc) HandlerMiddleware
func NewCheckpointMiddleware ¶
func NewCheckpointMiddleware(cp CpStore) HandlerMiddleware
func NewLogMiddleware ¶
func NewLogMiddleware(attrs ...any) HandlerMiddleware
type IDGenerator ¶
type IDGenerator func() string
IDGenerator is a function that generates unique IDs for events.
func DefaultIDGenerator ¶
func DefaultIDGenerator() IDGenerator
DefaultIDGenerator returns the default ID generator using nanoid.
type InMemAggCpStore ¶
type InMemAggCpStore struct {
// contains filtered or unexported fields
}
func NewInMemAggCpStore ¶
func NewInMemAggCpStore() *InMemAggCpStore
type InMemCpStore ¶
type InMemCpStore struct {
// contains filtered or unexported fields
}
func NewInMemCpStore ¶
func NewInMemCpStore() *InMemCpStore
type InMemoryStore ¶
type InMemoryStore struct {
// contains filtered or unexported fields
}
InMemoryStore is a simple, correct (optimistic) store for tests/dev.
func NewInMemoryStore ¶
func NewInMemoryStore() *InMemoryStore
func (*InMemoryStore) Append ¶
func (s *InMemoryStore) Append( _ context.Context, aggType string, aggID string, expectVersion Version, events []Envelope, ) (*StoreAppendResult, error)
func (*InMemoryStore) Load ¶
func (s *InMemoryStore) Load( _ context.Context, aggType, aggID string, opts ...StoreLoadOption, ) ([]Envelope, error)
func (*InMemoryStore) Subscribe ¶
func (s *InMemoryStore) Subscribe(ctx context.Context, opts ...SubscribeOption) (Subscription, error)
type KeyValueSnapshotter ¶
type KeyValueSnapshotter struct {
// contains filtered or unexported fields
}
func NewInMemorySnapshotter ¶
func NewInMemorySnapshotter() *KeyValueSnapshotter
func NewKeyValueSnapshotter ¶
func NewKeyValueSnapshotter(store kv.Store) *KeyValueSnapshotter
func (*KeyValueSnapshotter) LoadSnapshot ¶
func (*KeyValueSnapshotter) SaveSnapshot ¶
func (k *KeyValueSnapshotter) SaveSnapshot(ctx context.Context, snapshot Snapshot, opts SnapshotSaveOpts) error
type LoadAndSaveOption ¶
type LoadAndSaveOption interface {
// contains filtered or unexported methods
}
type LoadOption ¶
type LoadOption interface {
// contains filtered or unexported methods
}
type LoadOptsOption ¶
type LoadOptsOption MultiOption[LoadOption]
func WithLoadOpts ¶
func WithLoadOpts(opts ...LoadOption) LoadOptsOption
type MiddlewareHandleFunc ¶
type MiddlewareOption ¶
type MiddlewareOption valueOption[[]HandlerMiddleware]
func WithMiddlewares ¶
func WithMiddlewares(mws ...HandlerMiddleware) MiddlewareOption
func WithMiddlewaresAppend ¶
func WithMiddlewaresAppend(mws ...HandlerMiddleware) MiddlewareOption
type MsgCtx ¶
type MsgCtx struct {
// contains filtered or unexported fields
}
MsgCtx provides context for handling a single event. It wraps the event envelope with additional metadata about the processing context, including whether the consumer is in live mode (processing real-time events) or catching up on historical events.
func (*MsgCtx) AggregateID ¶
func (*MsgCtx) AggregateType ¶
func (*MsgCtx) Data ¶
func (c *MsgCtx) Data() json.RawMessage
func (*MsgCtx) OccurredAt ¶
type MultiOption ¶
type MultiOption[T any] struct { // contains filtered or unexported fields }
type Projection ¶
Projection consumes persisted events to build read models / indexes.
type RepoCacheOption ¶
func WithRepoCache ¶
func WithRepoCache(cache cache.Cache) RepoCacheOption
func WithRepoCacheLRU ¶
func WithRepoCacheLRU(size int) RepoCacheOption
type RepoCreateOption ¶
type RepoCreateOption valueOption[bool]
func WithCreate ¶
func WithCreate() RepoCreateOption
type RepoIDGeneratorOption ¶
type RepoIDGeneratorOption valueOption[IDGenerator]
func WithIDGenerator ¶
func WithIDGenerator(gen IDGenerator) RepoIDGeneratorOption
WithIDGenerator sets a custom ID generator for event envelope IDs.
type RepoUseCacheOption ¶
type RepoUseCacheOption valueOption[bool]
func WithUseCache ¶
func WithUseCache(useCache bool) RepoUseCacheOption
type Repository ¶
type Repository interface {
Load(ctx context.Context, agg Aggregate, opts ...LoadOption) error
Save(ctx context.Context, agg Aggregate, opts ...SaveOption) error
CreateSnapshot(ctx context.Context, agg Aggregate, saveSnapshotOpts SnapshotSaveOpts) (Snapshot, error)
}
func NewRepository ¶
func NewRepository( log *slog.Logger, store EventStore, registry *EventRegistry, opts ...RepositoryOption, ) Repository
type RepositoryOption ¶
type RepositoryOption interface {
// contains filtered or unexported methods
}
type SaveOption ¶
type SaveOption interface {
// contains filtered or unexported methods
}
type SaveOptsOption ¶
type SaveOptsOption MultiOption[SaveOption]
func WithSaveOpts ¶
func WithSaveOpts(opts ...SaveOption) SaveOptsOption
type Snapshot ¶
type Snapshot struct {
// SnapshotID is the unique identifier of this snapshot.
SnapshotID string `json:"snapshot_id"`
// ObjID is the identifier of the snapshotted object (aggregate ID or projection name).
ObjID string `json:"obj_id"`
// ObjType is the type of the snapshotted object (aggregate type or "projection").
ObjType string `json:"obj_type"`
// ObjVersion is the version of the object at the time of snapshot.
ObjVersion Version `json:"obj_version"`
// StreamSeq is the global stream sequence number at the time of snapshot.
// When restoring, events after this sequence are replayed.
StreamSeq uint64 `json:"stream_seq"`
// CreatedAt is when this snapshot was created.
CreatedAt time.Time `json:"created_at"`
// SchemaVersion tracks the snapshot format for migrations.
SchemaVersion int `json:"schema_version"`
// Encoding indicates how Data is encoded (typically "json").
Encoding string `json:"encoding"`
// Data contains the serialized state.
Data []byte `json:"data"`
}
Snapshot represents a point-in-time capture of an aggregate or projection state. Snapshots optimize loading by allowing the system to restore state directly instead of replaying all events from the beginning.
func CreateSnapshot ¶
func LoadSnapshot ¶
type SnapshotOption ¶
type SnapshotOption valueOption[bool]
func WithSnapshot ¶
func WithSnapshot(b bool) SnapshotOption
type SnapshotProjection ¶
type SnapshotProjection[T SnapshottableProjection] struct { // contains filtered or unexported fields }
SnapshotProjection wraps a SnapshottableProjection and provides automatic snapshotting at configurable intervals during live mode.
func NewSnapshotProjection ¶
func NewSnapshotProjection[T SnapshottableProjection]( log *slog.Logger, innerProjection T, snapshotter Snapshotter, opts ...SnapshotProjectionOption, ) (*SnapshotProjection[T], error)
func (*SnapshotProjection[T]) GetLastSeq ¶
func (p *SnapshotProjection[T]) GetLastSeq() (uint64, error)
func (*SnapshotProjection[T]) Handle ¶
func (p *SnapshotProjection[T]) Handle(msgCtx MsgCtx) error
func (*SnapshotProjection[T]) Name ¶
func (p *SnapshotProjection[T]) Name() string
func (*SnapshotProjection[T]) Projection ¶
func (p *SnapshotProjection[T]) Projection() T
type SnapshotProjectionOption ¶
type SnapshotProjectionOption interface {
// contains filtered or unexported methods
}
SnapshotProjectionOption configures a SnapshotProjection.
func WithSnapshotFrequency ¶
func WithSnapshotFrequency(n uint64) SnapshotProjectionOption
WithSnapshotFrequency sets how often (in events) snapshots are taken in live mode. Default is 10 events.
type SnapshotSaveOpts ¶
type SnapshotSaveOpts struct {
// TTL sets the time-to-live for the snapshot. Zero means no expiration.
TTL time.Duration
}
SnapshotSaveOpts configures snapshot persistence.
type SnapshotTTLOption ¶
func WithSnapshotTTL ¶
func WithSnapshotTTL(ttl time.Duration) SnapshotTTLOption
type Snapshottable ¶
type Snapshottable interface {
// Snapshot serializes the current state to bytes.
Snapshot() (data []byte, err error)
// RestoreSnapshot restores state from previously serialized bytes.
RestoreSnapshot(data []byte) error
}
Snapshottable is implemented by types that support custom snapshot serialization. If not implemented, JSON marshaling is used as a fallback.
type SnapshottableProjection ¶
type SnapshottableProjection interface {
Projection
Snapshottable
}
type Snapshotter ¶
type Snapshotter interface {
// SaveSnapshot persists a snapshot with optional TTL.
SaveSnapshot(ctx context.Context, snapshot Snapshot, opts SnapshotSaveOpts) error
// LoadSnapshot retrieves the latest snapshot for an object.
// Returns ErrSnapshotNotFound if no snapshot exists.
LoadSnapshot(ctx context.Context, objType, objID string) (Snapshot, error)
}
Snapshotter provides storage operations for snapshots.
type SnapshotterOption ¶
type SnapshotterOption valueOption[Snapshotter]
func WithSnapshotter ¶
func WithSnapshotter(s Snapshotter) SnapshotterOption
type StartSeqOption ¶
type StartSeqOption valueOption[uint64]
func WithStartSeq ¶
func WithStartSeq(startSeq uint64) StartSeqOption
func (StartSeqOption) ApplyToStoreLoadOptions ¶
func (o StartSeqOption) ApplyToStoreLoadOptions(receiver storeLoadOptionsReceiver)
type StoreAppendResult ¶
type StoreAppendResult struct {
// LastSeq is the global sequence number assigned to the last appended event.
LastSeq uint64
}
StoreAppendResult contains the result of appending events to the store.
func AppendEvents ¶
func AppendEvents( ctx context.Context, store EventStore, aggType string, aggID string, expect Version, events ...any, ) (*StoreAppendResult, error)
type StoreLoadOption ¶
type StoreLoadOption interface {
ApplyToStoreLoadOptions(storeLoadOptionsReceiver)
}
func WithStartAtVersion ¶
func WithStartAtVersion(startVersion Version) StoreLoadOption
type StoreOption ¶
type StoreOption valueOption[EventStore]
func WithStore ¶
func WithStore(s EventStore) StoreOption
type Stream ¶
type Stream interface {
Subscribe(ctx context.Context, opts ...SubscribeOption) (Subscription, error)
}
type SubscribeFilter ¶
type SubscribeOption ¶
type SubscribeOption func(opts *SubscribeOpts)
func WithDeliverPolicy ¶
func WithDeliverPolicy(policy DeliverPolicy) SubscribeOption
func WithFilters ¶
func WithFilters(filters ...SubscribeFilter) SubscribeOption
func WithStartSequence ¶
func WithStartSequence(startSequence uint64) SubscribeOption
type SubscribeOpts ¶
type SubscribeOpts struct {
// contains filtered or unexported fields
}
func NewSubscribeOpts ¶
func NewSubscribeOpts(opts ...SubscribeOption) SubscribeOpts
func (*SubscribeOpts) DeliverPolicy ¶
func (s *SubscribeOpts) DeliverPolicy() DeliverPolicy
func (*SubscribeOpts) Filters ¶
func (s *SubscribeOpts) Filters() []SubscribeFilter
func (*SubscribeOpts) StartSequence ¶
func (s *SubscribeOpts) StartSequence() uint64
type Subscription ¶
type TestingEnv ¶
type TestingEnv struct {
*Env
// contains filtered or unexported fields
}
func StartTestEnv ¶
func StartTestEnv( t *testing.T, opts ...EnvOption, ) *TestingEnv
func (*TestingEnv) Assert ¶
func (e *TestingEnv) Assert() *TestingEnvAssert
type TestingEnvAssert ¶
type TestingEnvAssert struct {
// contains filtered or unexported fields
}
type TypedRepository ¶
type TypedRepository[T Aggregate] interface { GetAggType() string New() T NewWithID(id string) T Create(ctx context.Context, aggID string, opts ...SaveOption) (agg T, err error) GetOrCreate(ctx context.Context, aggID string, opts ...LoadAndSaveOption) (agg T, err error) // GetByID gets an aggregate by ID. If the aggregate does not exist, it is created. GetByID(ctx context.Context, aggID string, opts ...LoadOption) (T, error) WithTransaction(ctx context.Context, aggID string, do func(T) error, opts ...WithTransactionOption) error Save(ctx context.Context, agg T, opts ...SaveOption) error }
func NewTypedRepository ¶
func NewTypedRepository[T Aggregate](log *slog.Logger, s EventStore, reg *EventRegistry, opts ...RepositoryOption) TypedRepository[T]
func NewTypedRepositoryFrom ¶
func NewTypedRepositoryFrom[T Aggregate](log *slog.Logger, r Repository, opts ...RepositoryOption) TypedRepository[T]
type Version ¶
type Version uint64
Version represents the version number of an aggregate within its stream. It is a monotonically increasing value starting from 1 for the first event. Version is used for optimistic concurrency control - when saving changes, the expected version must match the current version in the store.
type WithTransactionOption ¶
type WithTransactionOption interface {
// contains filtered or unexported methods
}