Documentation
¶
Overview ¶
Package mink provides event sourcing and CQRS primitives for Go applications. It offers a simple, flexible API for building event-sourced systems with support for multiple database backends.
Package mink provides event sourcing and CQRS primitives for Go applications.
go-mink is an Event Sourcing library for Go that makes it easy to build applications using event sourcing patterns. It provides a simple API for storing events, loading aggregates, and projecting read models.
Quick Start ¶
Create an event store with the in-memory adapter for development:
import (
"go-mink.dev"
"go-mink.dev/adapters/memory"
)
store := mink.New(memory.NewAdapter())
For production, use the PostgreSQL adapter:
import (
"go-mink.dev"
"go-mink.dev/adapters/postgres"
)
adapter, err := postgres.NewAdapter(ctx, connStr)
if err != nil {
log.Fatal(err)
}
store := mink.New(adapter)
Defining Events ¶
Events are simple structs that represent something that happened in your domain:
type OrderCreated struct {
OrderID string `json:"orderId"`
CustomerID string `json:"customerId"`
}
type ItemAdded struct {
OrderID string `json:"orderId"`
SKU string `json:"sku"`
Quantity int `json:"quantity"`
Price float64 `json:"price"`
}
Register events with the store so they can be serialized and deserialized:
store.RegisterEvents(OrderCreated{}, ItemAdded{})
Defining Aggregates ¶
Aggregates are domain objects that encapsulate business logic and generate events:
type Order struct {
mink.AggregateBase
CustomerID string
Items []OrderItem
Status string
}
func NewOrder(id string) *Order {
return &Order{
AggregateBase: mink.NewAggregateBase(id, "Order"),
}
}
func (o *Order) Create(customerID string) {
o.Apply(OrderCreated{OrderID: o.AggregateID(), CustomerID: customerID})
o.CustomerID = customerID
o.Status = "Created"
}
func (o *Order) ApplyEvent(event interface{}) error {
switch e := event.(type) {
case OrderCreated:
o.CustomerID = e.CustomerID
o.Status = "Created"
case ItemAdded:
o.Items = append(o.Items, OrderItem{SKU: e.SKU, Quantity: e.Quantity, Price: e.Price})
}
// NOTE: Version is managed automatically by LoadAggregate and SaveAggregate.
// You do NOT need to call IncrementVersion() here.
return nil
}
Saving and Loading Aggregates ¶
Save aggregates to persist their uncommitted events:
order := NewOrder("order-123")
order.Create("customer-456")
order.AddItem("SKU-001", 2, 29.99)
err := store.SaveAggregate(ctx, order)
Load aggregates to rebuild state from events:
loaded := NewOrder("order-123")
err := store.LoadAggregate(ctx, loaded)
// loaded.Status == "Created"
// loaded.Items contains the added item
// loaded.Version() returns the number of events in the stream
Low-Level Event Operations ¶
Append events directly to a stream:
events := []interface{}{
OrderCreated{OrderID: "123", CustomerID: "456"},
ItemAdded{OrderID: "123", SKU: "SKU-001", Quantity: 2, Price: 29.99},
}
err := store.Append(ctx, "Order-123", events)
Load events from a stream:
events, err := store.Load(ctx, "Order-123")
Optimistic Concurrency ¶
Use expected versions to prevent concurrent modifications:
// Create new stream (must not exist) err := store.Append(ctx, "Order-123", events, mink.ExpectVersion(mink.NoStream)) // Append to existing stream at specific version err := store.Append(ctx, "Order-123", events, mink.ExpectVersion(1))
Version constants:
- AnyVersion (-1): Skip version check
- NoStream (0): Stream must not exist
- StreamExists (-2): Stream must exist
Metadata ¶
Add metadata to events for tracing and multi-tenancy:
metadata := mink.Metadata{}.
WithUserID("user-123").
WithCorrelationID("corr-456").
WithTenantID("tenant-789")
err := store.Append(ctx, "Order-123", events, mink.WithAppendMetadata(metadata))
Commands and CQRS (v0.2.0) ¶
Define commands to encapsulate user intentions:
type CreateOrder struct {
mink.CommandBase
CustomerID string `json:"customerId"`
}
func (c CreateOrder) CommandType() string { return "CreateOrder" }
func (c CreateOrder) Validate() error {
if c.CustomerID == "" {
return mink.NewValidationError("CustomerID", "required")
}
return nil
}
Create a command bus with middleware:
bus := mink.NewCommandBus()
bus.Use(mink.ValidationMiddleware())
bus.Use(mink.RecoveryMiddleware(func(err error) { log.Error(err) }))
bus.Use(mink.LoggingMiddleware(logger, nil))
Register command handlers:
bus.Register("CreateOrder", func(ctx context.Context, cmd mink.Command) (mink.CommandResult, error) {
c := cmd.(CreateOrder)
order := NewOrder(uuid.New().String())
order.Create(c.CustomerID)
if err := store.SaveAggregate(ctx, order); err != nil {
return mink.NewErrorResult(err), err
}
return mink.NewSuccessResult(order.AggregateID(), order.Version()), nil
})
Dispatch commands:
result, err := bus.Dispatch(ctx, CreateOrder{CustomerID: "cust-123"})
Idempotency ¶
Prevent duplicate command processing with idempotency:
idempotencyStore := memory.NewIdempotencyStore() config := mink.DefaultIdempotencyConfig(idempotencyStore) bus.Use(mink.IdempotencyMiddleware(config))
Make commands idempotent by implementing IdempotentCommand:
func (c CreateOrder) IdempotencyKey() string { return c.RequestID }
Index ¶
- Constants
- Variables
- func BuildStreamID(aggregateType, aggregateID string) string
- func CausationIDFromContext(ctx context.Context) string
- func CorrelationIDFromContext(ctx context.Context) string
- func GenerateIdempotencyKey(cmd Command) string
- func GetCommandType(cmd interface{}) string
- func GetEncryptedFields(m Metadata) []string
- func GetEncryptionKeyID(m Metadata) string
- func GetEventType(event interface{}) string
- func GetIdempotencyKey(cmd Command) string
- func GetSchemaVersion(m Metadata) int
- func IdempotencyKeyFromField(fieldGetter func(Command) string) func(Command) string
- func IdempotencyKeyPrefix(prefix string) func(Command) string
- func IsEncrypted(m Metadata) bool
- func RegisterGenericHandler[C Command](registry *HandlerRegistry, ...)
- func SagaStateToJSON(state *SagaState) ([]byte, error)
- func ShouldHandleEventType(handledEvents []string, eventType string) bool
- func TenantIDFromContext(ctx context.Context) string
- func Version() string
- func WithCausationID(ctx context.Context, causationID string) context.Context
- func WithTenantID(ctx context.Context, tenantID string) context.Context
- type Aggregate
- type AggregateBase
- func (a *AggregateBase) AggregateID() string
- func (a *AggregateBase) AggregateType() string
- func (a *AggregateBase) Apply(event interface{})
- func (a *AggregateBase) ClearUncommittedEvents()
- func (a *AggregateBase) HasUncommittedEvents() bool
- func (a *AggregateBase) IncrementVersion()
- func (a *AggregateBase) SetID(id string)
- func (a *AggregateBase) SetType(t string)
- func (a *AggregateBase) SetVersion(v int64)
- func (a *AggregateBase) StreamID() StreamID
- func (a *AggregateBase) UncommittedEvents() []interface{}
- func (a *AggregateBase) Version() int64
- type AggregateCommand
- type AggregateFactory
- type AggregateHandler
- type AggregateHandlerConfig
- type AggregateRoot
- type AppendOption
- type AsyncOptions
- type AsyncProjection
- type AsyncProjectionBase
- type AsyncResult
- type CatchupSubscription
- type CategoryFilter
- type Checkpoint
- type CheckpointStore
- type Clearable
- type Command
- type CommandBase
- func (c CommandBase) GetCausationID() string
- func (c CommandBase) GetCommandID() string
- func (c CommandBase) GetCorrelationID() string
- func (c CommandBase) GetMetadata(key string) string
- func (c CommandBase) WithCausationID(id string) CommandBase
- func (c CommandBase) WithCommandID(id string) CommandBase
- func (c CommandBase) WithCorrelationID(id string) CommandBase
- func (c CommandBase) WithMetadata(key, value string) CommandBase
- type CommandBus
- func (b *CommandBus) Close() error
- func (b *CommandBus) Dispatch(ctx context.Context, cmd Command) (CommandResult, error)
- func (b *CommandBus) DispatchAll(ctx context.Context, cmds ...Command) ([]DispatchResult, error)
- func (b *CommandBus) DispatchAsync(ctx context.Context, cmd Command) <-chan DispatchResult
- func (b *CommandBus) HandlerCount() int
- func (b *CommandBus) HasHandler(cmdType string) bool
- func (b *CommandBus) IsClosed() bool
- func (b *CommandBus) MiddlewareCount() int
- func (b *CommandBus) Register(handler CommandHandler)
- func (b *CommandBus) RegisterFunc(cmdType string, ...)
- func (b *CommandBus) Use(middleware ...Middleware)
- type CommandBusOption
- type CommandContext
- func (c *CommandContext) Get(key string) (interface{}, bool)
- func (c *CommandContext) GetString(key string) string
- func (c *CommandContext) Set(key string, value interface{})
- func (c *CommandContext) SetError(err error)
- func (c *CommandContext) SetResult(result CommandResult)
- func (c *CommandContext) SetSuccess(aggregateID string, version int64)
- type CommandDispatcher
- type CommandHandler
- type CommandHandlerFunc
- type CommandResult
- type CompositeFilter
- type ConcurrencyError
- type ContextValueMiddleware
- type DataExporter
- type DataExporterOption
- type DispatchResult
- type EncryptionError
- type EncryptionOption
- func WithDecryptionErrorHandler(handler func(err error, eventType string, metadata Metadata) error) EncryptionOption
- func WithDefaultKeyID(keyID string) EncryptionOption
- func WithEncryptedFields(eventType string, fields ...string) EncryptionOption
- func WithEncryptionProvider(p encryption.Provider) EncryptionOption
- func WithTenantKeyResolver(resolver func(tenantID string) string) EncryptionOption
- type Event
- type EventApplier
- type EventData
- type EventFilter
- type EventRegistry
- type EventStore
- func (s *EventStore) Adapter() adapters.EventStoreAdapter
- func (s *EventStore) Append(ctx context.Context, streamID string, events []interface{}, ...) error
- func (s *EventStore) Close() error
- func (s *EventStore) GetLastPosition(ctx context.Context) (uint64, error)
- func (s *EventStore) GetStreamInfo(ctx context.Context, streamID string) (*StreamInfo, error)
- func (s *EventStore) Initialize(ctx context.Context) error
- func (s *EventStore) Load(ctx context.Context, streamID string) ([]Event, error)
- func (s *EventStore) LoadAggregate(ctx context.Context, agg Aggregate) error
- func (s *EventStore) LoadEventsFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]StoredEvent, error)
- func (s *EventStore) LoadFrom(ctx context.Context, streamID string, fromVersion int64) ([]Event, error)
- func (s *EventStore) LoadRaw(ctx context.Context, streamID string, fromVersion int64) ([]StoredEvent, error)
- func (s *EventStore) ProcessStoredEvent(ctx context.Context, stored StoredEvent) (Event, error)
- func (s *EventStore) RegisterEvents(events ...interface{})
- func (s *EventStore) RegisterUpcasters(upcasters ...Upcaster) error
- func (s *EventStore) SaveAggregate(ctx context.Context, agg Aggregate) error
- func (s *EventStore) Serializer() Serializer
- type EventStoreWithOutbox
- func (es *EventStoreWithOutbox) Append(ctx context.Context, streamID string, events []interface{}, ...) error
- func (es *EventStoreWithOutbox) OutboxStore() OutboxStore
- func (es *EventStoreWithOutbox) SaveAggregate(ctx context.Context, agg Aggregate) error
- func (es *EventStoreWithOutbox) Store() *EventStore
- type EventSubscriber
- type EventTypeFilter
- type EventTypeNotRegisteredError
- type ExportError
- type ExportFilter
- func CombineFilters(filters ...ExportFilter) ExportFilter
- func FilterByEventTypes(types ...string) ExportFilter
- func FilterByMetadata(key, value string) ExportFilter
- func FilterByStreamPrefix(prefix string) ExportFilter
- func FilterByTenantID(tenantID string) ExportFilter
- func FilterByUserID(userID string) ExportFilter
- type ExportHandler
- type ExportRequest
- type ExportResult
- type ExportedEvent
- type ExportedMetadata
- type FieldDefinition
- type FieldEncryptionConfig
- type Filter
- type FilterOp
- type GenericHandler
- type HandlerNotFoundError
- type HandlerRegistry
- func (r *HandlerRegistry) Clear()
- func (r *HandlerRegistry) CommandTypes() []string
- func (r *HandlerRegistry) Count() int
- func (r *HandlerRegistry) Get(cmdType string) CommandHandler
- func (r *HandlerRegistry) Has(cmdType string) bool
- func (r *HandlerRegistry) Register(handler CommandHandler)
- func (r *HandlerRegistry) RegisterFunc(cmdType string, ...)
- func (r *HandlerRegistry) Remove(cmdType string)
- type IdempotencyConfig
- type IdempotencyRecord
- type IdempotencyReplayError
- type IdempotencyStore
- type IdempotentCommand
- type InMemoryRepository
- func (r *InMemoryRepository[T]) Clear(ctx context.Context) error
- func (r *InMemoryRepository[T]) Count(ctx context.Context, query Query) (int64, error)
- func (r *InMemoryRepository[T]) Delete(ctx context.Context, id string) error
- func (r *InMemoryRepository[T]) DeleteMany(ctx context.Context, query Query) (int64, error)
- func (r *InMemoryRepository[T]) Exists(ctx context.Context, id string) (bool, error)
- func (r *InMemoryRepository[T]) Find(ctx context.Context, query Query) ([]*T, error)
- func (r *InMemoryRepository[T]) FindOne(ctx context.Context, query Query) (*T, error)
- func (r *InMemoryRepository[T]) Get(ctx context.Context, id string) (*T, error)
- func (r *InMemoryRepository[T]) GetAll(ctx context.Context) ([]*T, error)
- func (r *InMemoryRepository[T]) GetMany(ctx context.Context, ids []string) ([]*T, error)
- func (r *InMemoryRepository[T]) Insert(ctx context.Context, model *T) error
- func (r *InMemoryRepository[T]) Len() int
- func (r *InMemoryRepository[T]) Update(ctx context.Context, id string, updateFn func(*T)) error
- func (r *InMemoryRepository[T]) Upsert(ctx context.Context, model *T) error
- type IncompatibleSchemaError
- type InlineProjection
- type JSONSerializer
- func (s *JSONSerializer) Deserialize(data []byte, eventType string) (interface{}, error)
- func (s *JSONSerializer) Register(eventType string, example interface{})
- func (s *JSONSerializer) RegisterAll(examples ...interface{})
- func (s *JSONSerializer) Registry() *EventRegistry
- func (s *JSONSerializer) Serialize(event interface{}) ([]byte, error)
- type KeyNotFoundError
- type KeyRevokedError
- type LiveOptions
- type LiveProjection
- type LiveProjectionBase
- type Logger
- type LoggingMiddleware
- type Metadata
- type MetricsCollector
- type Middleware
- func CausationIDMiddleware() Middleware
- func ChainMiddleware(middleware ...Middleware) Middleware
- func CommandTypeMiddleware(types []string, middleware Middleware) Middleware
- func ConditionalMiddleware(condition func(Command) bool, middleware Middleware) Middleware
- func CorrelationIDMiddleware(generator func() string) Middleware
- func IdempotencyMiddleware(config IdempotencyConfig) Middleware
- func MetricsMiddleware(collector MetricsCollector) Middleware
- func RecoveryMiddleware() Middleware
- func RetryMiddleware(config RetryConfig) Middleware
- func TenantMiddleware(extractor func(Command) string, required bool) Middleware
- func TimeoutMiddleware(timeout time.Duration) Middleware
- func ValidationMiddleware() Middleware
- type MiddlewareFunc
- type MultiValidationError
- func (e *MultiValidationError) Add(err *ValidationError)
- func (e *MultiValidationError) AddField(field, message string)
- func (e *MultiValidationError) Error() string
- func (e *MultiValidationError) HasErrors() bool
- func (e *MultiValidationError) Is(target error) bool
- func (e *MultiValidationError) Unwrap() error
- type Option
- type OrderBy
- type OutboxMessage
- type OutboxMetrics
- type OutboxOption
- type OutboxProcessor
- type OutboxRoute
- type OutboxStatus
- type OutboxStore
- type PanicError
- type ParallelRebuilder
- type PollingSubscription
- type ProcessorOption
- func WithBatchSize(n int) ProcessorOption
- func WithCleanupAge(d time.Duration) ProcessorOption
- func WithCleanupInterval(d time.Duration) ProcessorOption
- func WithMaxRetries(n int) ProcessorOption
- func WithOutboxMetrics(metrics OutboxMetrics) ProcessorOption
- func WithPollInterval(d time.Duration) ProcessorOption
- func WithProcessorLogger(logger Logger) ProcessorOption
- func WithPublisher(publisher Publisher) ProcessorOption
- func WithRetryBackoff(d time.Duration) ProcessorOption
- type ProgressCallback
- type Projection
- type ProjectionBase
- type ProjectionEngine
- func (e *ProjectionEngine) GetAllStatuses() []*ProjectionStatus
- func (e *ProjectionEngine) GetStatus(name string) (*ProjectionStatus, error)
- func (e *ProjectionEngine) IsRunning() bool
- func (e *ProjectionEngine) NotifyLiveProjections(ctx context.Context, events []StoredEvent)
- func (e *ProjectionEngine) ProcessInlineProjections(ctx context.Context, events []StoredEvent) error
- func (e *ProjectionEngine) RegisterAsync(projection AsyncProjection, opts ...AsyncOptions) error
- func (e *ProjectionEngine) RegisterInline(projection InlineProjection) error
- func (e *ProjectionEngine) RegisterLive(projection LiveProjection, opts ...LiveOptions) error
- func (e *ProjectionEngine) Start(ctx context.Context) error
- func (e *ProjectionEngine) Stop(ctx context.Context) error
- func (e *ProjectionEngine) Unregister(name string) error
- type ProjectionEngineOption
- type ProjectionError
- type ProjectionMetrics
- type ProjectionRebuilder
- type ProjectionRebuilderOption
- type ProjectionState
- type ProjectionStatus
- type Publisher
- type Query
- func (q *Query) And(field string, op FilterOp, value interface{}) *Query
- func (q *Query) Build() Query
- func (q *Query) OrderByAsc(field string) *Query
- func (q *Query) OrderByDesc(field string) *Query
- func (q *Query) Where(field string, op FilterOp, value interface{}) *Query
- func (q *Query) WithCount() *Query
- func (q *Query) WithLimit(limit int) *Query
- func (q *Query) WithOffset(offset int) *Query
- func (q *Query) WithPagination(page, pageSize int) *Query
- type QueryResult
- type ReadModelID
- type ReadModelRepository
- type RebuildOptions
- type RebuildProgress
- type RetryConfig
- type RetryPolicy
- type Saga
- type SagaBase
- func (s *SagaBase) Complete()
- func (s *SagaBase) CompletedAt() *time.Time
- func (s *SagaBase) CorrelationID() string
- func (s *SagaBase) CurrentStep() int
- func (s *SagaBase) Fail()
- func (s *SagaBase) IncrementVersion()
- func (s *SagaBase) MarkCompensated()
- func (s *SagaBase) SagaID() string
- func (s *SagaBase) SagaType() string
- func (s *SagaBase) SetCompletedAt(t *time.Time)
- func (s *SagaBase) SetCorrelationID(id string)
- func (s *SagaBase) SetCurrentStep(step int)
- func (s *SagaBase) SetID(id string)
- func (s *SagaBase) SetStartedAt(t time.Time)
- func (s *SagaBase) SetStatus(status SagaStatus)
- func (s *SagaBase) SetType(t string)
- func (s *SagaBase) SetVersion(v int64)
- func (s *SagaBase) StartCompensation()
- func (s *SagaBase) StartedAt() time.Time
- func (s *SagaBase) Status() SagaStatus
- func (s *SagaBase) Version() int64
- type SagaCorrelation
- type SagaFactory
- type SagaFailedError
- type SagaManager
- func (m *SagaManager) FindSagaByCorrelationID(ctx context.Context, correlationID string) (*SagaState, error)
- func (m *SagaManager) GetSaga(ctx context.Context, sagaID string) (*SagaState, error)
- func (m *SagaManager) IsRunning() bool
- func (m *SagaManager) Position() uint64
- func (m *SagaManager) ProcessEvent(ctx context.Context, event StoredEvent) error
- func (m *SagaManager) Register(sagaType string, factory SagaFactory, correlation SagaCorrelation)
- func (m *SagaManager) RegisterSimple(sagaType string, factory SagaFactory, startingEvents ...string)
- func (m *SagaManager) SetPosition(pos uint64)
- func (m *SagaManager) Start(ctx context.Context) error
- func (m *SagaManager) StartAsync(ctx context.Context) *AsyncResult
- func (m *SagaManager) StartSaga(ctx context.Context, sagaType string, triggerEvent StoredEvent) error
- func (m *SagaManager) StartSagaAsync(ctx context.Context, sagaType string, triggerEvent StoredEvent) *AsyncResult
- func (m *SagaManager) Stop()
- type SagaManagerOption
- func WithCommandBus(bus *CommandBus) SagaManagerOption
- func WithSagaLogger(logger Logger) SagaManagerOption
- func WithSagaPollInterval(d time.Duration) SagaManagerOption
- func WithSagaRetryAttempts(attempts int) SagaManagerOption
- func WithSagaRetryDelay(d time.Duration) SagaManagerOption
- func WithSagaSerializer(serializer Serializer) SagaManagerOption
- func WithSagaStore(store SagaStore) SagaManagerOption
- type SagaNotFoundError
- type SagaState
- type SagaStatus
- type SagaStep
- type SagaStepStatus
- type SagaStore
- type SchemaCompatibility
- type SchemaDefinition
- type SchemaRegistry
- func (r *SchemaRegistry) CheckCompatibility(eventType string, oldVersion, newVersion int) (SchemaCompatibility, error)
- func (r *SchemaRegistry) GetLatestVersion(eventType string) (int, error)
- func (r *SchemaRegistry) GetSchema(eventType string, version int) (*SchemaDefinition, error)
- func (r *SchemaRegistry) Register(eventType string, schema SchemaDefinition) error
- func (r *SchemaRegistry) RegisteredEventTypes() []string
- type SchemaVersionGapError
- type SerializationError
- type Serializer
- type SimpleDispatcher
- type StoredEvent
- type StreamID
- type StreamInfo
- type StreamNotFoundError
- type Subscription
- type SubscriptionAdapter
- type SubscriptionOptions
- type UpcastError
- type Upcaster
- type UpcasterChain
- func (c *UpcasterChain) HasUpcasters(eventType string) bool
- func (c *UpcasterChain) LatestVersion(eventType string) int
- func (c *UpcasterChain) Register(u Upcaster) error
- func (c *UpcasterChain) RegisteredEventTypes() []string
- func (c *UpcasterChain) Upcast(eventType string, fromVersion int, data []byte, metadata Metadata) ([]byte, int, error)
- func (c *UpcasterChain) Validate() error
- type UpcastingSerializer
- func (s *UpcastingSerializer) Chain() *UpcasterChain
- func (s *UpcastingSerializer) Deserialize(data []byte, eventType string) (interface{}, error)
- func (s *UpcastingSerializer) DeserializeWithVersion(data []byte, eventType string, schemaVersion int, metadata Metadata) (interface{}, error)
- func (s *UpcastingSerializer) Inner() Serializer
- func (s *UpcastingSerializer) Serialize(event interface{}) ([]byte, error)
- type ValidationError
- type Validator
- type ValidatorFunc
- type VersionSetter
- type VersionedAggregate
Constants ¶
const ( // AnyVersion skips version checking, allowing append regardless of current version. AnyVersion int64 = -1 // NoStream indicates the stream must not exist (for creating new streams). NoStream int64 = 0 // StreamExists indicates the stream must exist (for appending to existing streams). StreamExists int64 = -2 )
Version constants for optimistic concurrency control.
const ( OutboxPending = adapters.OutboxPending OutboxProcessing = adapters.OutboxProcessing OutboxCompleted = adapters.OutboxCompleted OutboxFailed = adapters.OutboxFailed OutboxDeadLetter = adapters.OutboxDeadLetter )
Outbox status constants.
const ( SagaStatusStarted = adapters.SagaStatusStarted SagaStatusRunning = adapters.SagaStatusRunning SagaStatusCompleted = adapters.SagaStatusCompleted SagaStatusFailed = adapters.SagaStatusFailed SagaStatusCompensating = adapters.SagaStatusCompensating SagaStatusCompensated = adapters.SagaStatusCompensated SagaStatusCompensationFailed = adapters.SagaStatusCompensationFailed )
Re-export saga status constants from adapters.
const ( SagaStepPending = adapters.SagaStepPending SagaStepRunning = adapters.SagaStepRunning SagaStepCompleted = adapters.SagaStepCompleted SagaStepFailed = adapters.SagaStepFailed SagaStepCompensated = adapters.SagaStepCompensated )
Re-export saga step status constants from adapters.
const ( // DefaultSchemaVersion is the schema version assumed for events without an explicit version. // All events stored before versioning was introduced are treated as version 1. DefaultSchemaVersion = 1 )
Variables ¶
var ( // ErrEncryptionFailed indicates a field encryption operation failed. ErrEncryptionFailed = encryption.ErrEncryptionFailed // ErrDecryptionFailed indicates a field decryption operation failed. ErrDecryptionFailed = encryption.ErrDecryptionFailed // ErrKeyNotFound indicates the requested encryption key does not exist. ErrKeyNotFound = encryption.ErrKeyNotFound // ErrKeyRevoked indicates the encryption key has been revoked (crypto-shredding). ErrKeyRevoked = encryption.ErrKeyRevoked // ErrProviderClosed indicates the encryption provider has been closed. ErrProviderClosed = encryption.ErrProviderClosed )
Encryption-related sentinel errors. These are aliases to the encryption package errors for compatibility.
var ( // ErrStreamNotFound indicates the requested stream does not exist. ErrStreamNotFound = adapters.ErrStreamNotFound // ErrConcurrencyConflict indicates an optimistic concurrency violation. ErrConcurrencyConflict = adapters.ErrConcurrencyConflict // ErrEventNotFound indicates the requested event does not exist. ErrEventNotFound = errors.New("mink: event not found") // ErrSerializationFailed indicates event serialization/deserialization failed. ErrSerializationFailed = errors.New("mink: serialization failed") // ErrEventTypeNotRegistered indicates an unknown event type was encountered. ErrEventTypeNotRegistered = errors.New("mink: event type not registered") // ErrNilAggregate indicates a nil aggregate was passed. ErrNilAggregate = errors.New("mink: nil aggregate") // ErrNilStore indicates a nil event store was passed. ErrNilStore = errors.New("mink: nil event store") // ErrEmptyStreamID indicates an empty stream ID was provided. ErrEmptyStreamID = adapters.ErrEmptyStreamID // ErrNoEvents indicates no events were provided for append. ErrNoEvents = adapters.ErrNoEvents // ErrInvalidVersion indicates an invalid version number was provided. ErrInvalidVersion = adapters.ErrInvalidVersion // ErrAdapterClosed indicates the adapter has been closed. ErrAdapterClosed = adapters.ErrAdapterClosed // ErrSubscriptionNotSupported indicates the adapter does not support subscriptions. ErrSubscriptionNotSupported = errors.New("mink: adapter does not support subscriptions") // ErrHandlerNotFound indicates no handler is registered for a command type. ErrHandlerNotFound = errors.New("mink: handler not found") // ErrValidationFailed indicates command validation failed. ErrValidationFailed = errors.New("mink: validation failed") // ErrCommandAlreadyProcessed indicates an idempotent command was already processed. ErrCommandAlreadyProcessed = errors.New("mink: command already processed") // ErrNilCommand indicates a nil command was passed. ErrNilCommand = errors.New("mink: nil command") // ErrHandlerPanicked indicates a handler panicked during execution. ErrHandlerPanicked = errors.New("mink: handler panicked") // ErrCommandBusClosed indicates the command bus has been closed. ErrCommandBusClosed = errors.New("mink: command bus closed") // ErrOutboxMessageNotFound indicates the requested outbox message does not exist. ErrOutboxMessageNotFound = adapters.ErrOutboxMessageNotFound // ErrOutboxStoreClosed indicates the outbox store has been closed. ErrOutboxStoreClosed = errors.New("mink: outbox store closed") // ErrPublisherNotFound indicates no publisher is registered for a destination. ErrPublisherNotFound = errors.New("mink: publisher not found for destination") // ErrOutboxProcessorRunning indicates the outbox processor is already running. ErrOutboxProcessorRunning = errors.New("mink: outbox processor already running") )
Sentinel errors for common error conditions. Use errors.Is() to check for these errors. These errors are aliases to the adapters package errors for compatibility.
var ( // ErrNilProjection indicates a nil projection was passed. ErrNilProjection = errors.New("mink: nil projection") // ErrEmptyProjectionName indicates a projection has no name. ErrEmptyProjectionName = errors.New("mink: projection name is required") // ErrProjectionNotFound indicates the requested projection does not exist. ErrProjectionNotFound = errors.New("mink: projection not found") // ErrProjectionAlreadyRegistered indicates a projection with the same name is already registered. ErrProjectionAlreadyRegistered = errors.New("mink: projection already registered") // ErrProjectionEngineAlreadyRunning indicates the projection engine is already running. ErrProjectionEngineAlreadyRunning = errors.New("mink: projection engine already running") // ErrProjectionEngineStopped indicates the projection engine has been stopped. ErrProjectionEngineStopped = errors.New("mink: projection engine stopped") // ErrNoCheckpointStore indicates no checkpoint store was configured. ErrNoCheckpointStore = errors.New("mink: checkpoint store is required") // ErrNotImplemented indicates a method is not implemented. ErrNotImplemented = errors.New("mink: not implemented") // ErrProjectionFailed indicates a projection failed to process an event. ErrProjectionFailed = errors.New("mink: projection failed") )
var ( // ErrExportFailed indicates a data export operation failed. ErrExportFailed = errors.New("mink: export failed") // ErrSubjectIDRequired indicates the subject ID was not provided in the export request. ErrSubjectIDRequired = errors.New("mink: subject ID is required for export") // ErrNoExportSources indicates neither streams nor a filter was provided. ErrNoExportSources = errors.New("mink: either streams or filter is required for export") // ErrExportScanNotSupported indicates the adapter does not support event scanning. // Provide explicit stream IDs in the ExportRequest instead. ErrExportScanNotSupported = errors.New("mink: adapter does not support event scanning; provide explicit stream IDs") )
Export-related sentinel errors.
var ( // ErrNotFound indicates the requested entity was not found. ErrNotFound = errors.New("mink: not found") // ErrAlreadyExists indicates the entity already exists. ErrAlreadyExists = errors.New("mink: already exists") // ErrInvalidQuery indicates the query is invalid. ErrInvalidQuery = errors.New("mink: invalid query") )
Repository errors
var ( // ErrSagaNotFound indicates the requested saga does not exist. ErrSagaNotFound = adapters.ErrSagaNotFound // ErrSagaAlreadyExists indicates a saga with the same ID already exists. ErrSagaAlreadyExists = adapters.ErrSagaAlreadyExists // ErrSagaCompleted indicates the saga has already completed. ErrSagaCompleted = errors.New("mink: saga already completed") // ErrSagaFailed indicates the saga has failed. ErrSagaFailed = errors.New("mink: saga failed") // ErrSagaCompensating indicates the saga is currently compensating. ErrSagaCompensating = errors.New("mink: saga is compensating") // ErrNoSagaHandler indicates no handler is registered for the event type. ErrNoSagaHandler = errors.New("mink: no saga handler for event") )
Saga-related sentinel errors.
var ( // ErrUpcastFailed indicates an upcaster failed to transform event data. ErrUpcastFailed = errors.New("mink: upcast failed") // ErrSchemaVersionGap indicates a gap in the upcaster chain for an event type. ErrSchemaVersionGap = errors.New("mink: schema version gap") // ErrIncompatibleSchema indicates a schema change is not backward compatible. ErrIncompatibleSchema = errors.New("mink: incompatible schema") // ErrSchemaNotFound indicates the requested schema was not found. ErrSchemaNotFound = errors.New("mink: schema not found") )
Sentinel errors for event versioning and upcasting.
var NewDecryptionError = encryption.NewDecryptionError
NewDecryptionError creates a new EncryptionError for a decrypt operation.
var NewEncryptionError = encryption.NewEncryptionError
NewEncryptionError creates a new EncryptionError for an encrypt operation.
var NewKeyNotFoundError = encryption.NewKeyNotFoundError
NewKeyNotFoundError creates a new KeyNotFoundError.
var NewKeyRevokedError = encryption.NewKeyRevokedError
NewKeyRevokedError creates a new KeyRevokedError.
Functions ¶
func BuildStreamID ¶
BuildStreamID creates a stream ID from an aggregate type and ID. This follows the convention: "{Type}-{ID}"
func CausationIDFromContext ¶
CausationIDFromContext returns the causation ID from context.
func CorrelationIDFromContext ¶
CorrelationIDFromContext returns the correlation ID from context.
func GenerateIdempotencyKey ¶
GenerateIdempotencyKey generates an idempotency key from a command. The key is based on the command type and its JSON-serialized content.
func GetCommandType ¶
func GetCommandType(cmd interface{}) string
GetCommandType returns the type name of a command using reflection. This is useful for commands that don't embed CommandBase.
func GetEncryptedFields ¶
GetEncryptedFields extracts the list of encrypted field names from event metadata.
func GetEncryptionKeyID ¶
GetEncryptionKeyID extracts the encryption key ID from event metadata.
func GetEventType ¶
func GetEventType(event interface{}) string
GetEventType returns the event type name for the given event. It uses the struct name as the type name.
func GetIdempotencyKey ¶
GetIdempotencyKey returns the idempotency key for a command. If the command implements IdempotentCommand, it uses that key. Otherwise, it generates a key from the command content.
func GetSchemaVersion ¶
GetSchemaVersion extracts the schema version from event metadata. Returns DefaultSchemaVersion if the version is not set or cannot be parsed.
func IdempotencyKeyFromField ¶
IdempotencyKeyFromField extracts the idempotency key from a field in the command. If the field is empty, it falls back to GenerateIdempotencyKey.
func IdempotencyKeyPrefix ¶
IdempotencyKeyPrefix is a convenience function to create a prefixed idempotency key.
func IsEncrypted ¶
IsEncrypted reports whether the event has encrypted fields.
func RegisterGenericHandler ¶
func RegisterGenericHandler[C Command](registry *HandlerRegistry, handler func(ctx context.Context, cmd C) (CommandResult, error))
RegisterGenericHandler is a convenience function to register a generic handler.
func SagaStateToJSON ¶
SagaStateToJSON converts saga state to JSON for persistence.
func ShouldHandleEventType ¶
ShouldHandleEventType checks if an event type should be handled given a list of handled events. Returns true if handledEvents is empty (meaning all events are handled) or if eventType is in the list. This is a utility function used by projection engine and rebuilder to filter events.
func TenantIDFromContext ¶
TenantIDFromContext returns the tenant ID from context.
func WithCausationID ¶
WithCausationID returns a context with the causation ID set.
Types ¶
type Aggregate ¶
type Aggregate interface {
// AggregateID returns the unique identifier for this aggregate instance.
AggregateID() string
// AggregateType returns the type/category of this aggregate (e.g., "Order", "Customer").
AggregateType() string
// Version returns the current version of the aggregate.
// This is the number of events that have been applied.
Version() int64
// ApplyEvent applies an event to update the aggregate's state.
// This method should be idempotent and deterministic.
ApplyEvent(event interface{}) error
// UncommittedEvents returns events that have been applied but not yet persisted.
UncommittedEvents() []interface{}
// ClearUncommittedEvents removes all uncommitted events after successful persistence.
ClearUncommittedEvents()
}
Aggregate defines the interface for event-sourced aggregates. An aggregate is a domain object whose state is derived from a sequence of events.
type AggregateBase ¶
type AggregateBase struct {
// contains filtered or unexported fields
}
AggregateBase provides a default partial implementation of the Aggregate interface. Embed this struct in your aggregate types to get default behavior.
func NewAggregateBase ¶
func NewAggregateBase(id, aggregateType string) AggregateBase
NewAggregateBase creates a new AggregateBase with the given ID and type.
func (*AggregateBase) AggregateID ¶
func (a *AggregateBase) AggregateID() string
AggregateID returns the aggregate's unique identifier.
func (*AggregateBase) AggregateType ¶
func (a *AggregateBase) AggregateType() string
AggregateType returns the aggregate type.
func (*AggregateBase) Apply ¶
func (a *AggregateBase) Apply(event interface{})
Apply records an event as uncommitted. This should be called by the aggregate after creating a new event. The aggregate should also update its internal state based on the event.
func (*AggregateBase) ClearUncommittedEvents ¶
func (a *AggregateBase) ClearUncommittedEvents()
ClearUncommittedEvents removes all uncommitted events.
func (*AggregateBase) HasUncommittedEvents ¶
func (a *AggregateBase) HasUncommittedEvents() bool
HasUncommittedEvents returns true if there are events waiting to be persisted.
func (*AggregateBase) IncrementVersion ¶
func (a *AggregateBase) IncrementVersion()
IncrementVersion increments the aggregate version by 1.
func (*AggregateBase) SetID ¶
func (a *AggregateBase) SetID(id string)
SetID sets the aggregate's ID.
func (*AggregateBase) SetType ¶
func (a *AggregateBase) SetType(t string)
SetType sets the aggregate type.
func (*AggregateBase) SetVersion ¶
func (a *AggregateBase) SetVersion(v int64)
SetVersion sets the aggregate version.
func (*AggregateBase) StreamID ¶
func (a *AggregateBase) StreamID() StreamID
StreamID returns the stream ID for this aggregate. The stream ID is composed of the aggregate type and ID.
func (*AggregateBase) UncommittedEvents ¶
func (a *AggregateBase) UncommittedEvents() []interface{}
UncommittedEvents returns events that haven't been persisted yet.
func (*AggregateBase) Version ¶
func (a *AggregateBase) Version() int64
Version returns the current version of the aggregate.
type AggregateCommand ¶
type AggregateCommand interface {
Command
// AggregateID returns the ID of the aggregate this command targets.
// Returns empty string for commands that create new aggregates.
AggregateID() string
}
AggregateCommand is a command that targets a specific aggregate.
type AggregateFactory ¶
AggregateFactory creates new aggregate instances.
type AggregateHandler ¶
type AggregateHandler[C AggregateCommand, A Aggregate] struct { // contains filtered or unexported fields }
AggregateHandler is a handler that works with aggregates and an event store. It loads the aggregate, executes the command, and saves the results.
func NewAggregateHandler ¶
func NewAggregateHandler[C AggregateCommand, A Aggregate](config AggregateHandlerConfig[C, A]) *AggregateHandler[C, A]
NewAggregateHandler creates a new AggregateHandler.
func (*AggregateHandler[C, A]) CommandType ¶
func (h *AggregateHandler[C, A]) CommandType() string
CommandType returns the command type this handler processes.
func (*AggregateHandler[C, A]) Handle ¶
func (h *AggregateHandler[C, A]) Handle(ctx context.Context, cmd Command) (CommandResult, error)
Handle loads the aggregate, executes the command, and saves the aggregate.
type AggregateHandlerConfig ¶
type AggregateHandlerConfig[C AggregateCommand, A Aggregate] struct { Store *EventStore Factory func(id string) A Executor func(ctx context.Context, agg A, cmd C) error NewIDFunc func() string }
AggregateHandlerConfig configures an AggregateHandler.
type AggregateRoot ¶
type AggregateRoot interface {
Aggregate
// GetID is an alias for AggregateID for DDD conventions.
GetID() string
}
AggregateRoot is an extended interface that includes domain-driven design patterns.
type AppendOption ¶
type AppendOption func(*appendConfig)
AppendOption configures an append operation.
func ExpectVersion ¶
func ExpectVersion(v int64) AppendOption
ExpectVersion sets the expected stream version for optimistic concurrency.
func WithAppendMetadata ¶
func WithAppendMetadata(m Metadata) AppendOption
WithMetadata sets metadata for all events in the append operation.
type AsyncOptions ¶
type AsyncOptions struct {
// BatchSize is the maximum number of events to process in a batch.
// Default: 100
BatchSize int
// BatchTimeout is the maximum time to wait for a full batch.
// Default: 1 second
BatchTimeout time.Duration
// PollInterval is how often to poll for new events when idle.
// Default: 100ms
PollInterval time.Duration
// RetryPolicy defines how to handle errors.
RetryPolicy RetryPolicy
// MaxRetries is the maximum number of retries for failed events.
// Default: 3
MaxRetries int
// StartFromBeginning starts processing from the beginning of the event stream.
// If false, starts from the last checkpoint.
// Default: false
StartFromBeginning bool
}
AsyncOptions configures async projection behavior.
func DefaultAsyncOptions ¶
func DefaultAsyncOptions() AsyncOptions
DefaultAsyncOptions returns the default async projection options.
type AsyncProjection ¶
type AsyncProjection interface {
Projection
// Apply processes a single event asynchronously.
Apply(ctx context.Context, event StoredEvent) error
// ApplyBatch processes multiple events in a single batch for efficiency.
// If not supported, implementations should process events sequentially.
ApplyBatch(ctx context.Context, events []StoredEvent) error
}
AsyncProjection processes events asynchronously in the background. This provides eventual consistency but better write performance and scalability.
type AsyncProjectionBase ¶
type AsyncProjectionBase struct {
ProjectionBase
}
AsyncProjectionBase provides a default implementation of AsyncProjection. Embed this struct and override Apply to create async projections.
func NewAsyncProjectionBase ¶
func NewAsyncProjectionBase(name string, handledEvents ...string) AsyncProjectionBase
NewAsyncProjectionBase creates a new AsyncProjectionBase.
func (*AsyncProjectionBase) ApplyBatch ¶
func (p *AsyncProjectionBase) ApplyBatch(ctx context.Context, events []StoredEvent) error
ApplyBatch provides a default implementation that processes events sequentially. Override this method for custom batch processing logic.
type AsyncResult ¶
type AsyncResult struct {
// contains filtered or unexported fields
}
AsyncResult represents the result of an asynchronous operation. It provides methods to wait for completion and check the result.
func (*AsyncResult) Cancel ¶
func (r *AsyncResult) Cancel()
Cancel cancels the async operation's context. This can be used to stop a long-running operation early.
func (*AsyncResult) Context ¶
func (r *AsyncResult) Context() context.Context
Context returns the context for this async operation.
func (*AsyncResult) Done ¶
func (r *AsyncResult) Done() <-chan struct{}
Done returns a channel that is closed when the operation completes. Use this in select statements for non-blocking wait patterns.
Example:
select {
case <-result.Done():
if err := result.Err(); err != nil {
log.Printf("Failed: %v", err)
}
case <-time.After(5 * time.Second):
result.Cancel()
log.Println("Timed out")
}
func (*AsyncResult) Err ¶
func (r *AsyncResult) Err() error
Err returns the error from the completed operation, or nil if not yet complete or successful.
func (*AsyncResult) IsComplete ¶
func (r *AsyncResult) IsComplete() bool
IsComplete returns true if the operation has completed (successfully or with an error).
func (*AsyncResult) Wait ¶
func (r *AsyncResult) Wait() error
Wait blocks until the operation completes and returns any error. This is a convenience method equivalent to <-result.Done(); return result.Err()
func (*AsyncResult) WaitWithTimeout ¶
func (r *AsyncResult) WaitWithTimeout(timeout time.Duration) error
WaitWithTimeout blocks until the operation completes or the timeout expires. Returns context.DeadlineExceeded if the timeout is reached before completion.
type CatchupSubscription ¶
type CatchupSubscription struct {
// contains filtered or unexported fields
}
CatchupSubscription provides catch-up subscription functionality. It first reads historical events from the event store, then switches to polling for new events. This ensures no events are missed during the transition.
func NewCatchupSubscription ¶
func NewCatchupSubscription( store *EventStore, fromPosition uint64, opts ...SubscriptionOptions, ) (*CatchupSubscription, error)
NewCatchupSubscription creates a new catch-up subscription. Call Start() to begin receiving events from the specified position.
func (*CatchupSubscription) Close ¶
func (s *CatchupSubscription) Close() error
Close stops the subscription.
func (*CatchupSubscription) Err ¶
func (s *CatchupSubscription) Err() error
Err returns any error that caused the subscription to close.
func (*CatchupSubscription) Events ¶
func (s *CatchupSubscription) Events() <-chan StoredEvent
Events returns the channel for receiving events.
func (*CatchupSubscription) Position ¶
func (s *CatchupSubscription) Position() uint64
Position returns the current position of the subscription.
type CategoryFilter ¶
type CategoryFilter struct {
// contains filtered or unexported fields
}
CategoryFilter filters events by stream category.
func NewCategoryFilter ¶
func NewCategoryFilter(category string) *CategoryFilter
NewCategoryFilter creates a filter that only matches events from streams in the category.
func (*CategoryFilter) Matches ¶
func (f *CategoryFilter) Matches(event StoredEvent) bool
Matches returns true if the event's stream is in the category.
type Checkpoint ¶
type Checkpoint struct {
// ProjectionName is the name of the projection.
ProjectionName string
// Position is the global position of the last processed event.
Position uint64
// UpdatedAt is when the checkpoint was last updated.
UpdatedAt time.Time
}
Checkpoint represents a stored checkpoint record.
type CheckpointStore ¶
type CheckpointStore interface {
// GetCheckpoint returns the last processed position for a projection.
// Returns 0 if no checkpoint exists.
GetCheckpoint(ctx context.Context, projectionName string) (uint64, error)
// SetCheckpoint stores the last processed position for a projection.
SetCheckpoint(ctx context.Context, projectionName string, position uint64) error
// DeleteCheckpoint removes the checkpoint for a projection.
DeleteCheckpoint(ctx context.Context, projectionName string) error
// GetAllCheckpoints returns checkpoints for all projections.
GetAllCheckpoints(ctx context.Context) (map[string]uint64, error)
}
CheckpointStore manages projection checkpoints. Checkpoints track the last processed position for each projection.
type Clearable ¶
type Clearable interface {
// Clear removes all data from the read model.
Clear(ctx context.Context) error
}
Clearable is an interface for projections that can clear their read model.
type Command ¶
type Command interface {
// CommandType returns the type identifier for this command (e.g., "CreateOrder").
CommandType() string
// Validate checks if the command is valid.
// Returns nil if valid, or an error describing validation failures.
Validate() error
}
Command represents an intent to change state in the system. Commands are the write side of CQRS and should be validated before execution.
type CommandBase ¶
type CommandBase struct {
// CommandID is an optional unique identifier for this command instance.
CommandID string `json:"commandId,omitempty"`
// CorrelationID links related commands and events for distributed tracing.
CorrelationID string `json:"correlationId,omitempty"`
// CausationID identifies the event or command that caused this command.
CausationID string `json:"causationId,omitempty"`
// Metadata contains arbitrary key-value pairs for application-specific data.
Metadata map[string]string `json:"metadata,omitempty"`
}
CommandBase provides a default partial implementation of Command. Embed this struct in your command types to get common functionality.
func (CommandBase) GetCausationID ¶
func (c CommandBase) GetCausationID() string
GetCausationID returns the causation ID.
func (CommandBase) GetCommandID ¶
func (c CommandBase) GetCommandID() string
GetCommandID returns the command ID.
func (CommandBase) GetCorrelationID ¶
func (c CommandBase) GetCorrelationID() string
GetCorrelationID returns the correlation ID.
func (CommandBase) GetMetadata ¶
func (c CommandBase) GetMetadata(key string) string
GetMetadata returns the value for a metadata key, or empty string if not found.
func (CommandBase) WithCausationID ¶
func (c CommandBase) WithCausationID(id string) CommandBase
WithCausationID returns a copy of CommandBase with the causation ID set.
func (CommandBase) WithCommandID ¶
func (c CommandBase) WithCommandID(id string) CommandBase
WithCommandID returns a copy of CommandBase with the command ID set.
func (CommandBase) WithCorrelationID ¶
func (c CommandBase) WithCorrelationID(id string) CommandBase
WithCorrelationID returns a copy of CommandBase with the correlation ID set.
func (CommandBase) WithMetadata ¶
func (c CommandBase) WithMetadata(key, value string) CommandBase
WithMetadata returns a copy of CommandBase with a metadata key-value pair added.
type CommandBus ¶
type CommandBus struct {
// contains filtered or unexported fields
}
CommandBus orchestrates command dispatching with middleware support. It routes commands to their handlers through a configurable middleware pipeline.
func NewCommandBus ¶
func NewCommandBus(opts ...CommandBusOption) *CommandBus
NewCommandBus creates a new CommandBus with the given options.
func (*CommandBus) Close ¶
func (b *CommandBus) Close() error
Close closes the command bus, preventing further dispatch operations.
func (*CommandBus) Dispatch ¶
func (b *CommandBus) Dispatch(ctx context.Context, cmd Command) (CommandResult, error)
Dispatch sends a command through the middleware pipeline to its handler.
func (*CommandBus) DispatchAll ¶
func (b *CommandBus) DispatchAll(ctx context.Context, cmds ...Command) ([]DispatchResult, error)
DispatchAll dispatches multiple commands and returns all results. Commands are dispatched sequentially in order.
func (*CommandBus) DispatchAsync ¶
func (b *CommandBus) DispatchAsync(ctx context.Context, cmd Command) <-chan DispatchResult
DispatchAsync sends a command asynchronously and returns immediately. The result can be retrieved through the returned channel.
func (*CommandBus) HandlerCount ¶
func (b *CommandBus) HandlerCount() int
HandlerCount returns the number of registered handlers.
func (*CommandBus) HasHandler ¶
func (b *CommandBus) HasHandler(cmdType string) bool
HasHandler returns true if a handler is registered for the command type.
func (*CommandBus) IsClosed ¶
func (b *CommandBus) IsClosed() bool
IsClosed returns true if the command bus has been closed.
func (*CommandBus) MiddlewareCount ¶
func (b *CommandBus) MiddlewareCount() int
MiddlewareCount returns the number of registered middleware.
func (*CommandBus) Register ¶
func (b *CommandBus) Register(handler CommandHandler)
Register adds a handler to the command bus.
func (*CommandBus) RegisterFunc ¶
func (b *CommandBus) RegisterFunc(cmdType string, fn func(ctx context.Context, cmd Command) (CommandResult, error))
RegisterFunc registers a handler function for a command type.
func (*CommandBus) Use ¶
func (b *CommandBus) Use(middleware ...Middleware)
Use adds middleware to the command bus. Middleware is executed in the order it was added.
type CommandBusOption ¶
type CommandBusOption func(*CommandBus)
CommandBusOption configures a CommandBus.
func WithHandlerRegistry ¶
func WithHandlerRegistry(registry *HandlerRegistry) CommandBusOption
WithHandlerRegistry sets a custom handler registry.
func WithMiddleware ¶
func WithMiddleware(middleware ...Middleware) CommandBusOption
WithMiddleware adds middleware to the command bus.
type CommandContext ¶
type CommandContext struct {
// Context is the standard Go context.
Context context.Context
// Command is the command being executed.
Command Command
// Result is the command execution result (set by handler).
Result CommandResult
// Metadata contains additional context data that can be set by middleware.
Metadata map[string]interface{}
}
CommandContext carries command execution context through the middleware chain.
func NewCommandContext ¶
func NewCommandContext(ctx context.Context, cmd Command) *CommandContext
NewCommandContext creates a new CommandContext.
func (*CommandContext) Get ¶
func (c *CommandContext) Get(key string) (interface{}, bool)
Get retrieves a value from the context metadata.
func (*CommandContext) GetString ¶
func (c *CommandContext) GetString(key string) string
GetString retrieves a string value from the context metadata.
func (*CommandContext) Set ¶
func (c *CommandContext) Set(key string, value interface{})
Set stores a value in the context metadata.
func (*CommandContext) SetError ¶
func (c *CommandContext) SetError(err error)
SetError sets an error result.
func (*CommandContext) SetResult ¶
func (c *CommandContext) SetResult(result CommandResult)
SetResult sets the command execution result.
func (*CommandContext) SetSuccess ¶
func (c *CommandContext) SetSuccess(aggregateID string, version int64)
SetSuccess sets a successful result.
type CommandDispatcher ¶
type CommandDispatcher interface {
// Dispatch sends a command to its handler and returns the result.
Dispatch(ctx context.Context, cmd Command) (CommandResult, error)
}
CommandDispatcher can dispatch commands to handlers.
type CommandHandler ¶
type CommandHandler interface {
// CommandType returns the type of command this handler processes.
CommandType() string
// Handle processes the command and returns a result.
Handle(ctx context.Context, cmd Command) (CommandResult, error)
}
CommandHandler is the interface for handling a specific command type. Handlers contain the business logic for processing commands.
type CommandHandlerFunc ¶
type CommandHandlerFunc struct {
// contains filtered or unexported fields
}
CommandHandlerFunc is a function type that implements CommandHandler.
func NewCommandHandlerFunc ¶
func NewCommandHandlerFunc(cmdType string, fn func(ctx context.Context, cmd Command) (CommandResult, error)) *CommandHandlerFunc
NewCommandHandlerFunc creates a new CommandHandlerFunc.
func (*CommandHandlerFunc) CommandType ¶
func (h *CommandHandlerFunc) CommandType() string
CommandType returns the command type this handler processes.
func (*CommandHandlerFunc) Handle ¶
func (h *CommandHandlerFunc) Handle(ctx context.Context, cmd Command) (CommandResult, error)
Handle processes the command.
type CommandResult ¶
type CommandResult struct {
// Success indicates whether the command executed successfully.
Success bool
// AggregateID is the ID of the aggregate affected by the command.
// For create commands, this is the ID of the newly created aggregate.
AggregateID string
// Version is the new version of the aggregate after command execution.
Version int64
// Data contains any additional result data.
Data interface{}
// Error contains the error if the command failed.
Error error
}
CommandResult represents the result of command execution. It can contain either a successful result or an error.
func IdempotencyRecordToResult ¶
func IdempotencyRecordToResult(r *IdempotencyRecord) CommandResult
IdempotencyRecordToResult converts the record to a CommandResult.
func NewErrorResult ¶
func NewErrorResult(err error) CommandResult
NewErrorResult creates a failed CommandResult.
func NewSuccessResult ¶
func NewSuccessResult(aggregateID string, version int64) CommandResult
NewSuccessResult creates a successful CommandResult.
func NewSuccessResultWithData ¶
func NewSuccessResultWithData(aggregateID string, version int64, data interface{}) CommandResult
NewSuccessResultWithData creates a successful CommandResult with additional data.
func (CommandResult) IsError ¶
func (r CommandResult) IsError() bool
IsError returns true if the command failed.
func (CommandResult) IsSuccess ¶
func (r CommandResult) IsSuccess() bool
IsSuccess returns true if the command executed successfully.
type CompositeFilter ¶
type CompositeFilter struct {
// contains filtered or unexported fields
}
CompositeFilter combines multiple filters with AND logic.
func NewCompositeFilter ¶
func NewCompositeFilter(filters ...EventFilter) *CompositeFilter
NewCompositeFilter creates a filter that matches only if all filters match.
func (*CompositeFilter) Matches ¶
func (f *CompositeFilter) Matches(event StoredEvent) bool
Matches returns true if all filters match.
type ConcurrencyError ¶
ConcurrencyError provides detailed information about a concurrency conflict.
func NewConcurrencyError ¶
func NewConcurrencyError(streamID string, expected, actual int64) *ConcurrencyError
NewConcurrencyError creates a new ConcurrencyError.
func (*ConcurrencyError) Error ¶
func (e *ConcurrencyError) Error() string
Error returns the error message.
func (*ConcurrencyError) Is ¶
func (e *ConcurrencyError) Is(target error) bool
Is reports whether this error matches the target error.
func (*ConcurrencyError) Unwrap ¶
func (e *ConcurrencyError) Unwrap() error
Unwrap returns the underlying error for errors.Unwrap().
type ContextValueMiddleware ¶
type ContextValueMiddleware struct {
// contains filtered or unexported fields
}
ContextValueMiddleware adds values to the context.
func NewContextValueMiddleware ¶
func NewContextValueMiddleware(key, value interface{}) *ContextValueMiddleware
NewContextValueMiddleware creates middleware that adds a value to the context.
func (*ContextValueMiddleware) Middleware ¶
func (m *ContextValueMiddleware) Middleware() Middleware
Middleware returns the middleware function.
type DataExporter ¶
type DataExporter struct {
// contains filtered or unexported fields
}
DataExporter handles GDPR data export (right to access / right to data portability). It collects events belonging to a data subject from the event store, decrypts encrypted fields, and returns them in a portable format.
When encrypted fields cannot be decrypted (e.g., key revoked via crypto-shredding), those events are included with Redacted=true and nil Data.
There are two enumeration strategies:
- Stream-based: provide explicit stream IDs in ExportRequest.Streams.
- Scan-based: provide an ExportFilter and the exporter scans all events. Requires the adapter to implement SubscriptionAdapter.
func NewDataExporter ¶
func NewDataExporter(store *EventStore, opts ...DataExporterOption) *DataExporter
NewDataExporter creates a new DataExporter for the given event store.
func (*DataExporter) Export ¶
func (e *DataExporter) Export(ctx context.Context, req ExportRequest) (*ExportResult, error)
Export collects all matching events for a data subject and returns them. Use ExportStream for large exports that should not be held in memory.
func (*DataExporter) ExportStream ¶
func (e *DataExporter) ExportStream(ctx context.Context, req ExportRequest, handler ExportHandler) error
ExportStream calls handler for each matching event, without holding all events in memory. This is suitable for large exports. Events are yielded in stream order for stream-based export, or global position order for scan-based export. Return a non-nil error from the handler to stop the export early.
type DataExporterOption ¶
type DataExporterOption func(*DataExporter)
DataExporterOption configures a DataExporter.
func WithExportBatchSize ¶
func WithExportBatchSize(size int) DataExporterOption
WithExportBatchSize sets the number of events loaded per batch during scan-based export. Default is 1000.
func WithExportLogger ¶
func WithExportLogger(l Logger) DataExporterOption
WithExportLogger sets the logger for the data exporter.
type DispatchResult ¶
type DispatchResult struct {
CommandResult
Error error
}
DispatchResult contains the result of an asynchronous dispatch operation.
func (DispatchResult) IsSuccess ¶
func (r DispatchResult) IsSuccess() bool
IsSuccess returns true if the dispatch was successful.
type EncryptionError ¶
type EncryptionError = encryption.EncryptionError
EncryptionError provides detailed information about an encryption or decryption failure.
type EncryptionOption ¶
type EncryptionOption func(*FieldEncryptionConfig)
EncryptionOption configures a FieldEncryptionConfig.
func WithDecryptionErrorHandler ¶
func WithDecryptionErrorHandler(handler func(err error, eventType string, metadata Metadata) error) EncryptionOption
WithDecryptionErrorHandler sets a handler for decryption errors. This is used for crypto-shredding: when a key has been deleted, the handler can return nil to skip the event or return a custom error. If the handler returns nil, the event data is returned as-is (still encrypted).
func WithDefaultKeyID ¶
func WithDefaultKeyID(keyID string) EncryptionOption
WithDefaultKeyID sets the default master key ID used when no tenant key resolver is configured or when the tenant ID is empty.
func WithEncryptedFields ¶
func WithEncryptedFields(eventType string, fields ...string) EncryptionOption
WithEncryptedFields registers field paths to encrypt for a given event type. Field paths are dot-separated JSON field names (e.g., "email", "address.street").
func WithEncryptionProvider ¶
func WithEncryptionProvider(p encryption.Provider) EncryptionOption
WithEncryptionProvider sets the encryption provider.
func WithTenantKeyResolver ¶
func WithTenantKeyResolver(resolver func(tenantID string) string) EncryptionOption
WithTenantKeyResolver sets a function that maps tenant IDs to master key IDs. This enables per-tenant encryption keys for multi-tenant applications.
type Event ¶
type Event struct {
// ID is the globally unique event identifier.
ID string
// StreamID identifies the stream this event belongs to.
StreamID string
// Type is the event type identifier.
Type string
// Data is the deserialized event payload.
Data interface{}
// Metadata contains contextual information.
Metadata Metadata
// Version is the position within the stream (1-based).
Version int64
// GlobalPosition is the position across all streams.
GlobalPosition uint64
// Timestamp is when the event was stored.
Timestamp time.Time
}
Event represents a deserialized event with its data as a Go type. This is the high-level representation used by applications.
func DeserializeEvent ¶
func DeserializeEvent(serializer Serializer, stored StoredEvent) (Event, error)
DeserializeEvent is a convenience function that deserializes a StoredEvent to an Event.
func EventFromStored ¶
func EventFromStored(stored StoredEvent, data interface{}) Event
EventFromStored creates an Event from a StoredEvent with deserialized data.
type EventApplier ¶
type EventApplier func(aggregate interface{}, event interface{}) error
EventApplier is a function type for applying events to aggregates.
type EventData ¶
type EventData struct {
// Type is the event type identifier (e.g., "OrderCreated").
Type string
// Data is the serialized event payload.
Data []byte
// Metadata contains optional contextual information.
Metadata Metadata
}
EventData represents an event to be stored. It contains the event type, serialized payload, and optional metadata.
func NewEventData ¶
NewEventData creates a new EventData with the given type and data.
func SerializeEvent ¶
func SerializeEvent(serializer Serializer, event interface{}, metadata Metadata) (EventData, error)
SerializeEvent is a convenience function that serializes an event and returns EventData.
func SerializeEventWithVersion ¶
func SerializeEventWithVersion(serializer Serializer, event interface{}, metadata Metadata, schemaVersion int) (EventData, error)
SerializeEventWithVersion is a convenience function that serializes an event and stamps the metadata with the given schema version.
func (EventData) WithMetadata ¶
WithMetadata returns a copy of EventData with the metadata set.
type EventFilter ¶
type EventFilter interface {
// Matches returns true if the event should be delivered.
Matches(event StoredEvent) bool
}
EventFilter determines which events should be delivered.
type EventRegistry ¶
type EventRegistry struct {
// contains filtered or unexported fields
}
EventRegistry maps event type names to Go types. It is used by the JSONSerializer to deserialize events to the correct type.
func NewEventRegistry ¶
func NewEventRegistry() *EventRegistry
NewEventRegistry creates a new empty EventRegistry.
func (*EventRegistry) Count ¶
func (r *EventRegistry) Count() int
Count returns the number of registered event types.
func (*EventRegistry) Lookup ¶
func (r *EventRegistry) Lookup(eventType string) (reflect.Type, bool)
Lookup returns the Go type for the given event type name. Returns nil and false if the type is not registered.
func (*EventRegistry) Register ¶
func (r *EventRegistry) Register(eventType string, example interface{})
Register adds a mapping from eventType to the Go type of the example. The example should be a value (not a pointer) of the event type.
func (*EventRegistry) RegisterAll ¶
func (r *EventRegistry) RegisterAll(examples ...interface{})
RegisterAll registers multiple events using their struct names as type names. Each example should be a value (not a pointer) of the event type.
func (*EventRegistry) RegisteredTypes ¶
func (r *EventRegistry) RegisteredTypes() []string
RegisteredTypes returns a slice of all registered event type names.
type EventStore ¶
type EventStore struct {
// contains filtered or unexported fields
}
EventStore is the main entry point for event sourcing operations. It provides methods for appending events, loading aggregates, and managing streams.
func New ¶
func New(adapter adapters.EventStoreAdapter, opts ...Option) *EventStore
New creates a new EventStore with the given adapter and options.
func (*EventStore) Adapter ¶
func (s *EventStore) Adapter() adapters.EventStoreAdapter
Adapter returns the underlying adapter.
func (*EventStore) Append ¶
func (s *EventStore) Append(ctx context.Context, streamID string, events []interface{}, opts ...AppendOption) error
Append stores events to the specified stream. Events can be Go structs which will be serialized using the configured serializer.
func (*EventStore) Close ¶
func (s *EventStore) Close() error
Close releases resources held by the event store.
func (*EventStore) GetLastPosition ¶
func (s *EventStore) GetLastPosition(ctx context.Context) (uint64, error)
GetLastPosition returns the global position of the last stored event.
func (*EventStore) GetStreamInfo ¶
func (s *EventStore) GetStreamInfo(ctx context.Context, streamID string) (*StreamInfo, error)
GetStreamInfo returns metadata about a stream.
func (*EventStore) Initialize ¶
func (s *EventStore) Initialize(ctx context.Context) error
Initialize sets up the required storage schema.
func (*EventStore) LoadAggregate ¶
func (s *EventStore) LoadAggregate(ctx context.Context, agg Aggregate) error
LoadAggregate loads an aggregate's state by replaying its events. The aggregate should be a new instance with its ID and type already set.
If the aggregate implements VersionSetter, the version will be set to the number of events loaded. This is required for proper optimistic concurrency control when saving the aggregate later.
Note: AggregateBase implements VersionSetter, so aggregates embedding AggregateBase will automatically have their version set correctly.
func (*EventStore) LoadEventsFromPosition ¶
func (s *EventStore) LoadEventsFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]StoredEvent, error)
LoadEventsFromPosition loads events starting from a global position. Returns ErrSubscriptionNotSupported if the adapter does not implement SubscriptionAdapter. This is a helper method used by ProjectionEngine and ProjectionRebuilder.
func (*EventStore) LoadFrom ¶
func (s *EventStore) LoadFrom(ctx context.Context, streamID string, fromVersion int64) ([]Event, error)
LoadFrom retrieves events from a stream starting from the specified version.
func (*EventStore) LoadRaw ¶
func (s *EventStore) LoadRaw(ctx context.Context, streamID string, fromVersion int64) ([]StoredEvent, error)
LoadRaw retrieves raw (non-deserialized) events from a stream.
func (*EventStore) ProcessStoredEvent ¶
func (s *EventStore) ProcessStoredEvent(ctx context.Context, stored StoredEvent) (Event, error)
ProcessStoredEvent applies decryption, upcasting, and deserialization to a stored event. This is used by components that work with raw StoredEvents (e.g., DataExporter) and need the full processing pipeline.
func (*EventStore) RegisterEvents ¶
func (s *EventStore) RegisterEvents(events ...interface{})
RegisterEvents registers event types with the serializer. This is required for deserializing events back to their original types.
func (*EventStore) RegisterUpcasters ¶
func (s *EventStore) RegisterUpcasters(upcasters ...Upcaster) error
RegisterUpcasters is a convenience method that registers upcasters with the event store. If no UpcasterChain has been configured, a new one is created automatically.
func (*EventStore) SaveAggregate ¶
func (s *EventStore) SaveAggregate(ctx context.Context, agg Aggregate) error
SaveAggregate persists uncommitted events from an aggregate. The aggregate's version is used for optimistic concurrency control.
After a successful save, if the aggregate implements VersionSetter, the version will be updated to reflect the new stream version. This allows for subsequent modifications without reloading.
func (*EventStore) Serializer ¶
func (s *EventStore) Serializer() Serializer
Serializer returns the event store's serializer.
type EventStoreWithOutbox ¶
type EventStoreWithOutbox struct {
// contains filtered or unexported fields
}
EventStoreWithOutbox wraps an EventStore to automatically schedule outbox messages when events are appended. If the adapter implements OutboxAppender, events and outbox messages are written atomically in the same transaction.
func NewEventStoreWithOutbox ¶
func NewEventStoreWithOutbox(store *EventStore, outboxStore OutboxStore, routes []OutboxRoute, opts ...OutboxOption) *EventStoreWithOutbox
NewEventStoreWithOutbox creates a new EventStoreWithOutbox wrapper.
func (*EventStoreWithOutbox) Append ¶
func (es *EventStoreWithOutbox) Append(ctx context.Context, streamID string, events []interface{}, opts ...AppendOption) error
Append stores events and schedules outbox messages.
func (*EventStoreWithOutbox) OutboxStore ¶
func (es *EventStoreWithOutbox) OutboxStore() OutboxStore
OutboxStore returns the underlying OutboxStore.
func (*EventStoreWithOutbox) SaveAggregate ¶
func (es *EventStoreWithOutbox) SaveAggregate(ctx context.Context, agg Aggregate) error
SaveAggregate persists uncommitted events and schedules outbox messages.
func (*EventStoreWithOutbox) Store ¶
func (es *EventStoreWithOutbox) Store() *EventStore
Store returns the underlying EventStore.
type EventSubscriber ¶
type EventSubscriber interface {
// SubscribeAll subscribes to all events starting from the given position.
SubscribeAll(ctx context.Context, fromPosition uint64, opts ...SubscriptionOptions) (Subscription, error)
// SubscribeStream subscribes to events from a specific stream.
SubscribeStream(ctx context.Context, streamID string, fromVersion int64, opts ...SubscriptionOptions) (Subscription, error)
// SubscribeCategory subscribes to events from all streams in a category.
SubscribeCategory(ctx context.Context, category string, fromPosition uint64, opts ...SubscriptionOptions) (Subscription, error)
}
EventSubscriber provides event subscription capabilities.
type EventTypeFilter ¶
type EventTypeFilter struct {
// contains filtered or unexported fields
}
EventTypeFilter filters events by type.
func NewEventTypeFilter ¶
func NewEventTypeFilter(eventTypes ...string) *EventTypeFilter
NewEventTypeFilter creates a filter that only matches the specified event types.
func (*EventTypeFilter) Matches ¶
func (f *EventTypeFilter) Matches(event StoredEvent) bool
Matches returns true if the event type is in the filter.
type EventTypeNotRegisteredError ¶
type EventTypeNotRegisteredError struct {
EventType string
}
EventTypeNotRegisteredError provides detailed information about an unregistered event type.
func NewEventTypeNotRegisteredError ¶
func NewEventTypeNotRegisteredError(eventType string) *EventTypeNotRegisteredError
NewEventTypeNotRegisteredError creates a new EventTypeNotRegisteredError.
func (*EventTypeNotRegisteredError) Error ¶
func (e *EventTypeNotRegisteredError) Error() string
Error returns the error message.
func (*EventTypeNotRegisteredError) Is ¶
func (e *EventTypeNotRegisteredError) Is(target error) bool
Is reports whether this error matches the target error.
func (*EventTypeNotRegisteredError) Unwrap ¶
func (e *EventTypeNotRegisteredError) Unwrap() error
Unwrap returns the underlying error for errors.Unwrap().
type ExportError ¶
ExportError provides detailed information about a data export failure.
func NewExportError ¶
func NewExportError(subjectID string, cause error) *ExportError
NewExportError creates a new ExportError.
func (*ExportError) Is ¶
func (e *ExportError) Is(target error) bool
Is reports whether this error matches the target error.
func (*ExportError) Unwrap ¶
func (e *ExportError) Unwrap() error
Unwrap returns the underlying cause for errors.Unwrap().
type ExportFilter ¶
type ExportFilter func(event StoredEvent) bool
ExportFilter determines whether a stored event should be included in a data export.
func CombineFilters ¶
func CombineFilters(filters ...ExportFilter) ExportFilter
CombineFilters returns a filter that matches events passing ALL provided filters (AND logic).
func FilterByEventTypes ¶
func FilterByEventTypes(types ...string) ExportFilter
FilterByEventTypes returns a filter that matches events of any of the given types.
func FilterByMetadata ¶
func FilterByMetadata(key, value string) ExportFilter
FilterByMetadata returns a filter that matches events with a specific custom metadata key-value pair.
func FilterByStreamPrefix ¶
func FilterByStreamPrefix(prefix string) ExportFilter
FilterByStreamPrefix returns a filter that matches events from streams whose ID starts with the given prefix.
func FilterByTenantID ¶
func FilterByTenantID(tenantID string) ExportFilter
FilterByTenantID returns a filter that matches events with the given tenant ID.
func FilterByUserID ¶
func FilterByUserID(userID string) ExportFilter
FilterByUserID returns a filter that matches events with the given user ID.
type ExportHandler ¶
type ExportHandler func(ctx context.Context, event ExportedEvent) error
ExportHandler is called for each exported event during streaming export. Return a non-nil error to stop the export.
type ExportRequest ¶
type ExportRequest struct {
// SubjectID identifies the data subject (required).
SubjectID string
// Streams lists specific stream IDs to export.
// When provided, only these streams are loaded (efficient, no full scan).
Streams []string
// Filter selects which events to include.
// When Streams is empty, the exporter scans all events and applies this filter
// (requires the adapter to implement SubscriptionAdapter).
// When Streams is provided, the filter is applied within each stream.
Filter ExportFilter
// FromTime limits export to events stored at or after this time.
FromTime *time.Time
// ToTime limits export to events stored at or before this time.
ToTime *time.Time
}
ExportRequest describes what data to export for a data subject.
type ExportResult ¶
type ExportResult struct {
// SubjectID is the data subject identifier from the request.
SubjectID string
// Events contains all exported events, ordered by stream then version.
Events []ExportedEvent
// Streams lists all unique stream IDs that contained matching events.
Streams []string
// TotalEvents is the total number of events included (including redacted).
TotalEvents int
// RedactedCount is the number of events whose PII could not be decrypted
// (e.g., due to crypto-shredding / key revocation).
RedactedCount int
// ExportedAt is the timestamp when the export was generated.
ExportedAt time.Time
}
ExportResult contains all exported data for a data subject.
type ExportedEvent ¶
type ExportedEvent struct {
// StreamID identifies the stream this event belongs to.
StreamID string
// EventType is the event type identifier.
EventType string
// Data is the deserialized event payload.
// When Redacted is true, Data is nil.
Data interface{}
// RawData is the serialized event payload after decryption (JSON bytes).
// When Redacted is true, RawData contains the original (encrypted) bytes.
RawData []byte
// Metadata contains non-PII contextual information about the event.
Metadata ExportedMetadata
// Version is the position within the stream (1-based).
Version int64
// GlobalPosition is the position across all streams.
GlobalPosition uint64
// Timestamp is when the event was stored.
Timestamp time.Time
// Redacted indicates the event's encrypted fields could not be decrypted
// (e.g., because the encryption key was revoked via crypto-shredding).
Redacted bool
}
ExportedEvent represents a single event in the data export.
type ExportedMetadata ¶
type ExportedMetadata struct {
CorrelationID string
CausationID string
TenantID string
SchemaVersion int
}
ExportedMetadata contains non-PII metadata included in the export.
type FieldDefinition ¶
type FieldDefinition struct {
// Name is the field name.
Name string
// Type is the field type (e.g., "string", "int", "bool").
Type string
// Required indicates whether the field must be present.
Required bool
}
FieldDefinition describes a single field in an event schema.
type FieldEncryptionConfig ¶
type FieldEncryptionConfig struct {
// contains filtered or unexported fields
}
FieldEncryptionConfig configures per-event-type field encryption. It uses envelope encryption: a data encryption key (DEK) is generated per event, encrypted with the master key, and stored in metadata. Individual fields are encrypted locally with the DEK for performance.
func NewFieldEncryptionConfig ¶
func NewFieldEncryptionConfig(opts ...EncryptionOption) *FieldEncryptionConfig
NewFieldEncryptionConfig creates a new FieldEncryptionConfig with the given options.
func (*FieldEncryptionConfig) HasEncryptedFields ¶
func (c *FieldEncryptionConfig) HasEncryptedFields(eventType string) bool
HasEncryptedFields reports whether any fields are configured for encryption for the given event type.
type Filter ¶
type Filter struct {
// Field is the field name to filter on.
Field string
// Op is the comparison operator.
Op FilterOp
// Value is the value to compare against.
Value interface{}
}
Filter represents a query filter condition.
type FilterOp ¶
type FilterOp string
FilterOp represents a filter operation.
const ( // FilterOpEq matches equal values. FilterOpEq FilterOp = "=" // FilterOpNe matches not equal values. FilterOpNe FilterOp = "!=" // FilterOpGt matches greater than values. FilterOpGt FilterOp = ">" // FilterOpGte matches greater than or equal values. FilterOpGte FilterOp = ">=" // FilterOpLt matches less than values. FilterOpLt FilterOp = "<" // FilterOpLte matches less than or equal values. FilterOpLte FilterOp = "<=" // FilterOpIn matches any value in a list. FilterOpIn FilterOp = "IN" // FilterOpNotIn matches no value in a list. FilterOpNotIn FilterOp = "NOT IN" // FilterOpLike matches using SQL LIKE pattern. FilterOpLike FilterOp = "LIKE" // FilterOpIsNull matches null values. FilterOpIsNull FilterOp = "IS NULL" // FilterOpIsNotNull matches non-null values. FilterOpIsNotNull FilterOp = "IS NOT NULL" // FilterOpContains matches arrays containing a value. FilterOpContains FilterOp = "CONTAINS" // FilterOpBetween matches values between two bounds. FilterOpBetween FilterOp = "BETWEEN" )
type GenericHandler ¶
type GenericHandler[C Command] struct { // contains filtered or unexported fields }
GenericHandler is a type-safe command handler for a specific command type. Use this to create handlers with compile-time type checking.
func NewGenericHandler ¶
func NewGenericHandler[C Command](handler func(ctx context.Context, cmd C) (CommandResult, error)) *GenericHandler[C]
NewGenericHandler creates a new GenericHandler for the specified command type.
func (*GenericHandler[C]) CommandType ¶
func (h *GenericHandler[C]) CommandType() string
CommandType returns the command type this handler processes.
func (*GenericHandler[C]) Handle ¶
func (h *GenericHandler[C]) Handle(ctx context.Context, cmd Command) (CommandResult, error)
Handle processes the command with type checking.
type HandlerNotFoundError ¶
type HandlerNotFoundError struct {
CommandType string
}
HandlerNotFoundError provides detailed information about a missing handler.
func NewHandlerNotFoundError ¶
func NewHandlerNotFoundError(cmdType string) *HandlerNotFoundError
NewHandlerNotFoundError creates a new HandlerNotFoundError.
func (*HandlerNotFoundError) Error ¶
func (e *HandlerNotFoundError) Error() string
Error returns the error message.
func (*HandlerNotFoundError) Is ¶
func (e *HandlerNotFoundError) Is(target error) bool
Is reports whether this error matches the target error.
func (*HandlerNotFoundError) Unwrap ¶
func (e *HandlerNotFoundError) Unwrap() error
Unwrap returns the underlying error for errors.Unwrap().
type HandlerRegistry ¶
type HandlerRegistry struct {
// contains filtered or unexported fields
}
HandlerRegistry manages command handler registration and lookup.
func NewHandlerRegistry ¶
func NewHandlerRegistry() *HandlerRegistry
NewHandlerRegistry creates a new HandlerRegistry.
func (*HandlerRegistry) CommandTypes ¶
func (r *HandlerRegistry) CommandTypes() []string
CommandTypes returns all registered command types.
func (*HandlerRegistry) Count ¶
func (r *HandlerRegistry) Count() int
Count returns the number of registered handlers.
func (*HandlerRegistry) Get ¶
func (r *HandlerRegistry) Get(cmdType string) CommandHandler
Get returns the handler for a command type. Returns nil if no handler is registered.
func (*HandlerRegistry) Has ¶
func (r *HandlerRegistry) Has(cmdType string) bool
Has returns true if a handler is registered for the command type.
func (*HandlerRegistry) Register ¶
func (r *HandlerRegistry) Register(handler CommandHandler)
Register adds a handler for a command type. If a handler is already registered for this type, it will be replaced.
func (*HandlerRegistry) RegisterFunc ¶
func (r *HandlerRegistry) RegisterFunc(cmdType string, fn func(ctx context.Context, cmd Command) (CommandResult, error))
RegisterFunc registers a handler function for a command type.
func (*HandlerRegistry) Remove ¶
func (r *HandlerRegistry) Remove(cmdType string)
Remove removes a handler for a command type.
type IdempotencyConfig ¶
type IdempotencyConfig struct {
// Store is the idempotency store to use.
Store IdempotencyStore
// TTL is how long to keep idempotency records.
// Default is 24 hours.
TTL time.Duration
// KeyGenerator generates idempotency keys from commands.
// If nil, GetIdempotencyKey is used.
KeyGenerator func(Command) string
// StoreErrors determines if failed commands should be stored.
// If true, replaying a failed command returns the same error.
// If false, failed commands can be retried.
// Default is false.
StoreErrors bool
// SkipCommands is a list of command types to skip idempotency checking.
SkipCommands []string
}
IdempotencyConfig configures the idempotency middleware.
func DefaultIdempotencyConfig ¶
func DefaultIdempotencyConfig(store IdempotencyStore) IdempotencyConfig
DefaultIdempotencyConfig returns a default idempotency configuration.
type IdempotencyRecord ¶
type IdempotencyRecord = adapters.IdempotencyRecord
IdempotencyRecord stores information about a processed command.
func NewIdempotencyRecord ¶
func NewIdempotencyRecord(key, cmdType string, result CommandResult, ttl time.Duration) *IdempotencyRecord
NewIdempotencyRecord creates a new IdempotencyRecord from a CommandResult.
type IdempotencyReplayError ¶
IdempotencyReplayError indicates a command was already processed.
func (*IdempotencyReplayError) Error ¶
func (e *IdempotencyReplayError) Error() string
func (*IdempotencyReplayError) Is ¶
func (e *IdempotencyReplayError) Is(target error) bool
func (*IdempotencyReplayError) Unwrap ¶
func (e *IdempotencyReplayError) Unwrap() error
type IdempotencyStore ¶
type IdempotencyStore = adapters.IdempotencyStore
IdempotencyStore tracks processed commands to prevent duplicate processing.
type IdempotentCommand ¶
type IdempotentCommand interface {
Command
// IdempotencyKey returns a unique key for deduplication.
// Commands with the same key will only be processed once.
IdempotencyKey() string
}
IdempotentCommand is a command that supports idempotency.
type InMemoryRepository ¶
type InMemoryRepository[T any] struct { // contains filtered or unexported fields }
InMemoryRepository provides an in-memory implementation of ReadModelRepository. Useful for testing and prototyping.
func NewInMemoryRepository ¶
func NewInMemoryRepository[T any](getID func(*T) string) *InMemoryRepository[T]
NewInMemoryRepository creates a new in-memory repository. The getID function extracts the ID from a read model.
func (*InMemoryRepository[T]) Clear ¶
func (r *InMemoryRepository[T]) Clear(ctx context.Context) error
Clear removes all read models.
func (*InMemoryRepository[T]) Count ¶
Count returns the number of read models matching the query. Note: This basic implementation ignores query filters and returns total count. For production use with filtering, implement a database-backed repository.
func (*InMemoryRepository[T]) Delete ¶
func (r *InMemoryRepository[T]) Delete(ctx context.Context, id string) error
Delete removes a read model by ID.
func (*InMemoryRepository[T]) DeleteMany ¶
DeleteMany removes all read models matching the query. Note: This basic implementation ignores query filters and deletes all items when filters are provided. For production use with filtering, implement a database-backed repository.
func (*InMemoryRepository[T]) Find ¶
func (r *InMemoryRepository[T]) Find(ctx context.Context, query Query) ([]*T, error)
Find queries read models with the given criteria. Note: This is a basic implementation that doesn't support all filter operations.
func (*InMemoryRepository[T]) FindOne ¶
func (r *InMemoryRepository[T]) FindOne(ctx context.Context, query Query) (*T, error)
FindOne returns the first read model matching the query.
func (*InMemoryRepository[T]) Get ¶
func (r *InMemoryRepository[T]) Get(ctx context.Context, id string) (*T, error)
Get retrieves a read model by ID.
func (*InMemoryRepository[T]) GetAll ¶
func (r *InMemoryRepository[T]) GetAll(ctx context.Context) ([]*T, error)
GetAll returns all read models in the repository.
func (*InMemoryRepository[T]) GetMany ¶
func (r *InMemoryRepository[T]) GetMany(ctx context.Context, ids []string) ([]*T, error)
GetMany retrieves multiple read models by their IDs.
func (*InMemoryRepository[T]) Insert ¶
func (r *InMemoryRepository[T]) Insert(ctx context.Context, model *T) error
Insert creates a new read model.
func (*InMemoryRepository[T]) Len ¶
func (r *InMemoryRepository[T]) Len() int
Len returns the number of items in the repository.
type IncompatibleSchemaError ¶
type IncompatibleSchemaError struct {
EventType string
OldVersion int
NewVersion int
Compatibility SchemaCompatibility
Reason string
}
IncompatibleSchemaError provides detailed information about a schema incompatibility.
func NewIncompatibleSchemaError ¶
func NewIncompatibleSchemaError(eventType string, oldVersion, newVersion int, compatibility SchemaCompatibility, reason string) *IncompatibleSchemaError
NewIncompatibleSchemaError creates a new IncompatibleSchemaError.
func (*IncompatibleSchemaError) Error ¶
func (e *IncompatibleSchemaError) Error() string
Error returns the error message.
func (*IncompatibleSchemaError) Is ¶
func (e *IncompatibleSchemaError) Is(target error) bool
Is reports whether this error matches the target error.
func (*IncompatibleSchemaError) Unwrap ¶
func (e *IncompatibleSchemaError) Unwrap() error
Unwrap returns the underlying error for errors.Unwrap().
type InlineProjection ¶
type InlineProjection interface {
Projection
// Apply processes a single event within the event store transaction.
// The projection should update its read model based on the event.
Apply(ctx context.Context, event StoredEvent) error
}
InlineProjection processes events in the same transaction as the event store append. This provides strong consistency but may impact write performance.
type JSONSerializer ¶
type JSONSerializer struct {
// contains filtered or unexported fields
}
JSONSerializer is the default Serializer implementation using JSON encoding.
func NewJSONSerializer ¶
func NewJSONSerializer() *JSONSerializer
NewJSONSerializer creates a new JSONSerializer with an empty registry.
func NewJSONSerializerWithRegistry ¶
func NewJSONSerializerWithRegistry(registry *EventRegistry) *JSONSerializer
NewJSONSerializerWithRegistry creates a new JSONSerializer with the given registry.
func (*JSONSerializer) Deserialize ¶
func (s *JSONSerializer) Deserialize(data []byte, eventType string) (interface{}, error)
Deserialize converts JSON bytes back to an event. If the event type is registered, returns a value of that type. Otherwise, returns a map[string]interface{}.
func (*JSONSerializer) Register ¶
func (s *JSONSerializer) Register(eventType string, example interface{})
Register adds an event type to the serializer's registry.
func (*JSONSerializer) RegisterAll ¶
func (s *JSONSerializer) RegisterAll(examples ...interface{})
RegisterAll registers multiple events using their struct names as type names.
func (*JSONSerializer) Registry ¶
func (s *JSONSerializer) Registry() *EventRegistry
Registry returns the underlying EventRegistry.
func (*JSONSerializer) Serialize ¶
func (s *JSONSerializer) Serialize(event interface{}) ([]byte, error)
Serialize converts an event to JSON bytes.
type KeyNotFoundError ¶
type KeyNotFoundError = encryption.KeyNotFoundError
KeyNotFoundError provides detailed information about a missing encryption key.
type KeyRevokedError ¶
type KeyRevokedError = encryption.KeyRevokedError
KeyRevokedError provides detailed information about a revoked encryption key.
type LiveOptions ¶
type LiveOptions struct {
// BufferSize is the size of the event channel buffer.
// Default: 1000
BufferSize int
}
LiveOptions configures live projection behavior.
func DefaultLiveOptions ¶
func DefaultLiveOptions() LiveOptions
DefaultLiveOptions returns the default live projection options.
type LiveProjection ¶
type LiveProjection interface {
Projection
// OnEvent is called for each event in real-time.
// This method should not block for long periods.
OnEvent(ctx context.Context, event StoredEvent)
// IsTransient returns true if this projection doesn't persist state.
// Transient projections are not checkpointed.
IsTransient() bool
}
LiveProjection receives events in real-time for dashboards and notifications. These projections are transient and don't persist state.
type LiveProjectionBase ¶
type LiveProjectionBase struct {
ProjectionBase
// contains filtered or unexported fields
}
LiveProjectionBase provides a default implementation of LiveProjection. Embed this struct and override OnEvent to create live projections.
func NewLiveProjectionBase ¶
func NewLiveProjectionBase(name string, transient bool, handledEvents ...string) LiveProjectionBase
NewLiveProjectionBase creates a new LiveProjectionBase.
func (*LiveProjectionBase) IsTransient ¶
func (p *LiveProjectionBase) IsTransient() bool
IsTransient returns whether this projection is transient.
type Logger ¶
type Logger interface {
Debug(msg string, args ...interface{})
Info(msg string, args ...interface{})
Warn(msg string, args ...interface{})
Error(msg string, args ...interface{})
}
Logger defines the logging interface for the event store.
type LoggingMiddleware ¶
type LoggingMiddleware struct {
// contains filtered or unexported fields
}
LoggingMiddleware logs command execution.
func NewLoggingMiddleware ¶
func NewLoggingMiddleware(logger Logger) *LoggingMiddleware
NewLoggingMiddleware creates a new LoggingMiddleware.
func (*LoggingMiddleware) Middleware ¶
func (m *LoggingMiddleware) Middleware() Middleware
Middleware returns the middleware function.
type Metadata ¶
type Metadata struct {
// CorrelationID links related events across services for distributed tracing.
CorrelationID string `json:"correlationId,omitempty"`
// CausationID identifies the event or command that caused this event.
CausationID string `json:"causationId,omitempty"`
// UserID identifies the user who triggered this event.
UserID string `json:"userId,omitempty"`
// TenantID identifies the tenant for multi-tenant applications.
TenantID string `json:"tenantId,omitempty"`
// Custom contains arbitrary key-value pairs for application-specific metadata.
Custom map[string]string `json:"custom,omitempty"`
}
Metadata contains contextual information about an event. It supports distributed tracing, multi-tenancy, and custom key-value pairs.
func SetSchemaVersion ¶
SetSchemaVersion returns a copy of Metadata with the schema version set.
func (Metadata) WithCausationID ¶
WithCausationID returns a copy of Metadata with the causation ID set.
func (Metadata) WithCorrelationID ¶
WithCorrelationID returns a copy of Metadata with the correlation ID set.
func (Metadata) WithCustom ¶
WithCustom returns a copy of Metadata with a custom key-value pair added.
func (Metadata) WithTenantID ¶
WithTenantID returns a copy of Metadata with the tenant ID set.
func (Metadata) WithUserID ¶
WithUserID returns a copy of Metadata with the user ID set.
type MetricsCollector ¶
type MetricsCollector interface {
// RecordCommand records a command execution.
RecordCommand(cmdType string, duration time.Duration, success bool, err error)
}
MetricsMiddleware collects metrics about command execution.
type Middleware ¶
type Middleware func(next MiddlewareFunc) MiddlewareFunc
Middleware wraps a handler function with additional functionality.
func CausationIDMiddleware ¶
func CausationIDMiddleware() Middleware
CausationIDMiddleware creates middleware that propagates causation IDs. The causation ID links events/commands to the command that caused them. This is essential for tracking the chain of causality in event sourcing.
func ChainMiddleware ¶
func ChainMiddleware(middleware ...Middleware) Middleware
ChainMiddleware creates a single middleware from multiple middleware.
func CommandTypeMiddleware ¶
func CommandTypeMiddleware(types []string, middleware Middleware) Middleware
CommandTypeMiddleware applies middleware only for specific command types.
func ConditionalMiddleware ¶
func ConditionalMiddleware(condition func(Command) bool, middleware Middleware) Middleware
ConditionalMiddleware applies middleware only if the condition is true.
func CorrelationIDMiddleware ¶
func CorrelationIDMiddleware(generator func() string) Middleware
CorrelationIDMiddleware creates middleware that propagates correlation IDs.
func IdempotencyMiddleware ¶
func IdempotencyMiddleware(config IdempotencyConfig) Middleware
IdempotencyMiddleware creates middleware that prevents duplicate command processing.
func MetricsMiddleware ¶
func MetricsMiddleware(collector MetricsCollector) Middleware
MetricsMiddleware creates middleware that records metrics.
func RecoveryMiddleware ¶
func RecoveryMiddleware() Middleware
RecoveryMiddleware recovers from panics in handlers and returns them as errors. It captures a sanitized representation of the command data for debugging.
func RetryMiddleware ¶
func RetryMiddleware(config RetryConfig) Middleware
RetryMiddleware creates middleware that retries failed commands.
func TenantMiddleware ¶
func TenantMiddleware(extractor func(Command) string, required bool) Middleware
TenantMiddleware extracts and validates tenant ID.
func TimeoutMiddleware ¶
func TimeoutMiddleware(timeout time.Duration) Middleware
TimeoutMiddleware adds a timeout to command execution.
func ValidationMiddleware ¶
func ValidationMiddleware() Middleware
ValidationMiddleware validates commands before they reach the handler. If validation fails, the command is not dispatched.
type MiddlewareFunc ¶
type MiddlewareFunc func(ctx context.Context, cmd Command) (CommandResult, error)
MiddlewareFunc is the function signature for command middleware.
type MultiValidationError ¶
type MultiValidationError struct {
// CommandType is the type of command that failed validation.
CommandType string
// Errors contains all validation errors.
Errors []*ValidationError
}
MultiValidationError contains multiple validation errors.
func NewMultiValidationError ¶
func NewMultiValidationError(cmdType string) *MultiValidationError
NewMultiValidationError creates a new MultiValidationError.
func (*MultiValidationError) Add ¶
func (e *MultiValidationError) Add(err *ValidationError)
Add adds a validation error.
func (*MultiValidationError) AddField ¶
func (e *MultiValidationError) AddField(field, message string)
AddField adds a validation error for a specific field.
func (*MultiValidationError) Error ¶
func (e *MultiValidationError) Error() string
Error returns the error message.
func (*MultiValidationError) HasErrors ¶
func (e *MultiValidationError) HasErrors() bool
HasErrors returns true if there are any validation errors.
func (*MultiValidationError) Is ¶
func (e *MultiValidationError) Is(target error) bool
Is reports whether this error matches the target error.
func (*MultiValidationError) Unwrap ¶
func (e *MultiValidationError) Unwrap() error
Unwrap returns the first error for errors.Unwrap().
type Option ¶
type Option func(*EventStore)
Option configures an EventStore.
func WithFieldEncryption ¶
func WithFieldEncryption(config *FieldEncryptionConfig) Option
WithFieldEncryption configures the event store with field-level encryption. When set, configured fields are automatically encrypted during appending and decrypted during loading. Uses envelope encryption for performance.
func WithSerializer ¶
func WithSerializer(s Serializer) Option
WithSerializer sets a custom serializer.
func WithUpcasters ¶
func WithUpcasters(chain *UpcasterChain) Option
WithUpcasters configures the event store with an upcaster chain for transparent schema evolution. When set, events are automatically upcasted to the latest schema version during loading and stamped with the latest version during appending.
type OrderBy ¶
type OrderBy struct {
// Field is the field name to sort by.
Field string
// Desc specifies descending order.
Desc bool
}
OrderBy represents a sort order.
type OutboxMessage ¶
type OutboxMessage = adapters.OutboxMessage
OutboxMessage represents a message in the transactional outbox.
type OutboxMetrics ¶
type OutboxMetrics interface {
RecordMessageProcessed(destination string, success bool)
RecordMessageFailed(destination string)
RecordMessageDeadLettered()
RecordBatchDuration(duration time.Duration)
RecordPendingMessages(count int64)
}
OutboxMetrics collects metrics about outbox processing.
type OutboxOption ¶
type OutboxOption func(*EventStoreWithOutbox)
OutboxOption configures an EventStoreWithOutbox.
func WithOutboxLogger ¶
func WithOutboxLogger(l Logger) OutboxOption
WithOutboxLogger sets a logger for the outbox wrapper.
func WithOutboxMaxAttempts ¶
func WithOutboxMaxAttempts(n int) OutboxOption
WithOutboxMaxAttempts sets the default max attempts for outbox messages.
type OutboxProcessor ¶
type OutboxProcessor struct {
// contains filtered or unexported fields
}
OutboxProcessor polls the outbox store for pending messages and publishes them via registered publishers. It handles retries, dead-lettering, and cleanup.
func NewOutboxProcessor ¶
func NewOutboxProcessor(store OutboxStore, opts ...ProcessorOption) *OutboxProcessor
NewOutboxProcessor creates a new OutboxProcessor.
func (*OutboxProcessor) IsRunning ¶
func (p *OutboxProcessor) IsRunning() bool
IsRunning returns true if the processor is running.
type OutboxRoute ¶
type OutboxRoute struct {
// EventTypes is the list of event types this route matches. Empty matches all.
EventTypes []string
// Destination is the target (e.g., "webhook:https://example.com/events", "kafka:orders").
Destination string
// Transform optionally transforms the event payload before outbox scheduling.
// Note: the event parameter is always nil because the outbox operates on raw serialized data.
// Use the stored parameter to access stream ID, event type, raw data, and metadata.
Transform func(event interface{}, stored StoredEvent) ([]byte, error)
// Filter optionally filters events. Return true to include the event.
// Note: the event parameter is always nil because the outbox operates on raw serialized data.
// Use the stored parameter to access stream ID, event type, raw data, and metadata.
Filter func(event interface{}, stored StoredEvent) bool
}
OutboxRoute defines routing rules for outbox messages.
type OutboxStatus ¶
type OutboxStatus = adapters.OutboxStatus
OutboxStatus represents the current status of an outbox message.
type OutboxStore ¶
type OutboxStore = adapters.OutboxStore
OutboxStore defines the interface for outbox message persistence.
type PanicError ¶
type PanicError struct {
CommandType string
Value interface{}
Stack string
// CommandData contains a sanitized JSON representation of the command for debugging.
// Sensitive fields should be masked by the caller before setting this field.
CommandData string
}
PanicError provides detailed information about a handler panic.
func NewPanicError ¶
func NewPanicError(cmdType string, value interface{}, stack string) *PanicError
NewPanicError creates a new PanicError.
func NewPanicErrorWithCommand ¶
func NewPanicErrorWithCommand(cmdType string, value interface{}, stack string, commandData string) *PanicError
NewPanicErrorWithCommand creates a new PanicError with command data for debugging. The commandData should be a sanitized representation of the command (sensitive fields masked).
func (*PanicError) Is ¶
func (e *PanicError) Is(target error) bool
Is reports whether this error matches the target error.
func (*PanicError) Unwrap ¶
func (e *PanicError) Unwrap() error
Unwrap returns the underlying error for errors.Unwrap().
type ParallelRebuilder ¶
type ParallelRebuilder struct {
// contains filtered or unexported fields
}
ParallelRebuilder rebuilds multiple projections in parallel.
func NewParallelRebuilder ¶
func NewParallelRebuilder(rebuilder *ProjectionRebuilder, concurrency int) *ParallelRebuilder
NewParallelRebuilder creates a new parallel rebuilder.
func (*ParallelRebuilder) RebuildAll ¶
func (pr *ParallelRebuilder) RebuildAll(ctx context.Context, projections []AsyncProjection, opts ...RebuildOptions) error
RebuildAll rebuilds multiple async projections in parallel.
type PollingSubscription ¶
type PollingSubscription struct {
// contains filtered or unexported fields
}
PollingSubscription polls the event store for new events. This is a fallback when push-based subscriptions aren't available.
func NewPollingSubscription ¶
func NewPollingSubscription( store *EventStore, fromPosition uint64, opts ...SubscriptionOptions, ) *PollingSubscription
NewPollingSubscription creates a new polling subscription.
func (*PollingSubscription) Close ¶
func (s *PollingSubscription) Close() error
Close stops the subscription.
func (*PollingSubscription) Err ¶
func (s *PollingSubscription) Err() error
Err returns any error that caused the subscription to close.
func (*PollingSubscription) Events ¶
func (s *PollingSubscription) Events() <-chan StoredEvent
Events returns the channel for receiving events.
type ProcessorOption ¶
type ProcessorOption func(*OutboxProcessor)
ProcessorOption configures an OutboxProcessor.
func WithBatchSize ¶
func WithBatchSize(n int) ProcessorOption
WithBatchSize sets the maximum number of messages to process in a single batch.
func WithCleanupAge ¶
func WithCleanupAge(d time.Duration) ProcessorOption
WithCleanupAge sets the age threshold for cleaning up completed messages.
func WithCleanupInterval ¶
func WithCleanupInterval(d time.Duration) ProcessorOption
WithCleanupInterval sets how often completed messages are cleaned up.
func WithMaxRetries ¶
func WithMaxRetries(n int) ProcessorOption
WithMaxRetries sets the maximum number of delivery attempts.
func WithOutboxMetrics ¶
func WithOutboxMetrics(metrics OutboxMetrics) ProcessorOption
WithOutboxMetrics sets the metrics collector for the processor.
func WithPollInterval ¶
func WithPollInterval(d time.Duration) ProcessorOption
WithPollInterval sets how often the processor polls for pending messages.
func WithProcessorLogger ¶
func WithProcessorLogger(logger Logger) ProcessorOption
WithProcessorLogger sets the logger for the processor.
func WithPublisher ¶
func WithPublisher(publisher Publisher) ProcessorOption
WithPublisher registers a publisher for a given destination prefix.
func WithRetryBackoff ¶
func WithRetryBackoff(d time.Duration) ProcessorOption
WithRetryBackoff sets the duration between retry cycles.
type ProgressCallback ¶
type ProgressCallback func(progress RebuildProgress)
ProgressCallback is called periodically during rebuild with progress updates.
type Projection ¶
type Projection interface {
// Name returns the unique identifier for this projection.
// This name is used for checkpointing and management.
Name() string
// HandledEvents returns the list of event types this projection handles.
// An empty list means the projection handles all event types.
HandledEvents() []string
}
Projection is the base interface for all projection types. Projections transform events into optimized read models.
type ProjectionBase ¶
type ProjectionBase struct {
// contains filtered or unexported fields
}
ProjectionBase provides a default partial implementation of Projection. Embed this struct in your projection types to get common functionality.
func NewProjectionBase ¶
func NewProjectionBase(name string, handledEvents ...string) ProjectionBase
NewProjectionBase creates a new ProjectionBase.
func (*ProjectionBase) HandledEvents ¶
func (p *ProjectionBase) HandledEvents() []string
HandledEvents returns the list of event types this projection handles.
func (*ProjectionBase) HandlesEvent ¶
func (p *ProjectionBase) HandlesEvent(eventType string) bool
HandlesEvent returns true if this projection handles the given event type.
func (*ProjectionBase) Name ¶
func (p *ProjectionBase) Name() string
Name returns the projection name.
type ProjectionEngine ¶
type ProjectionEngine struct {
// contains filtered or unexported fields
}
ProjectionEngine manages the lifecycle of projections. It handles registration, starting, stopping, and monitoring projections.
func NewProjectionEngine ¶
func NewProjectionEngine(store *EventStore, opts ...ProjectionEngineOption) *ProjectionEngine
NewProjectionEngine creates a new ProjectionEngine.
func (*ProjectionEngine) GetAllStatuses ¶
func (e *ProjectionEngine) GetAllStatuses() []*ProjectionStatus
GetAllStatuses returns the status of all registered projections.
func (*ProjectionEngine) GetStatus ¶
func (e *ProjectionEngine) GetStatus(name string) (*ProjectionStatus, error)
GetStatus returns the status of a projection by name.
func (*ProjectionEngine) IsRunning ¶
func (e *ProjectionEngine) IsRunning() bool
IsRunning returns true if the engine is running.
func (*ProjectionEngine) NotifyLiveProjections ¶
func (e *ProjectionEngine) NotifyLiveProjections(ctx context.Context, events []StoredEvent)
NotifyLiveProjections notifies all live projections of new events.
func (*ProjectionEngine) ProcessInlineProjections ¶
func (e *ProjectionEngine) ProcessInlineProjections(ctx context.Context, events []StoredEvent) error
ProcessInlineProjections processes all inline projections for the given events. This is called by the event store after appending events.
func (*ProjectionEngine) RegisterAsync ¶
func (e *ProjectionEngine) RegisterAsync(projection AsyncProjection, opts ...AsyncOptions) error
RegisterAsync registers an async projection with the given options. Async projections are processed in background workers.
func (*ProjectionEngine) RegisterInline ¶
func (e *ProjectionEngine) RegisterInline(projection InlineProjection) error
RegisterInline registers an inline projection. Inline projections are processed synchronously with event appends.
func (*ProjectionEngine) RegisterLive ¶
func (e *ProjectionEngine) RegisterLive(projection LiveProjection, opts ...LiveOptions) error
RegisterLive registers a live projection with optional configuration. Live projections receive events in real-time.
func (*ProjectionEngine) Start ¶
func (e *ProjectionEngine) Start(ctx context.Context) error
Start starts the projection engine and all registered projections.
func (*ProjectionEngine) Stop ¶
func (e *ProjectionEngine) Stop(ctx context.Context) error
Stop gracefully stops the projection engine.
func (*ProjectionEngine) Unregister ¶
func (e *ProjectionEngine) Unregister(name string) error
Unregister removes a projection by name.
type ProjectionEngineOption ¶
type ProjectionEngineOption func(*ProjectionEngine)
ProjectionEngineOption configures a ProjectionEngine.
func WithCheckpointStore ¶
func WithCheckpointStore(store CheckpointStore) ProjectionEngineOption
WithCheckpointStore sets the checkpoint store for the engine.
func WithProjectionLogger ¶
func WithProjectionLogger(logger Logger) ProjectionEngineOption
WithProjectionLogger sets the logger for the engine.
func WithProjectionMetrics ¶
func WithProjectionMetrics(metrics ProjectionMetrics) ProjectionEngineOption
WithProjectionMetrics sets the metrics collector for the engine.
type ProjectionError ¶
ProjectionError provides detailed information about a projection failure.
func NewProjectionError ¶
func NewProjectionError(projectionName, eventType string, position uint64, cause error) *ProjectionError
NewProjectionError creates a new ProjectionError.
func (*ProjectionError) Error ¶
func (e *ProjectionError) Error() string
Error returns the error message.
func (*ProjectionError) Is ¶
func (e *ProjectionError) Is(target error) bool
Is reports whether this error matches the target error.
func (*ProjectionError) Unwrap ¶
func (e *ProjectionError) Unwrap() error
Unwrap returns the underlying cause for errors.Unwrap().
type ProjectionMetrics ¶
type ProjectionMetrics interface {
// RecordEventProcessed records that an event was processed.
RecordEventProcessed(projectionName, eventType string, duration time.Duration, success bool)
// RecordBatchProcessed records that a batch of events was processed.
RecordBatchProcessed(projectionName string, count int, duration time.Duration, success bool)
// RecordCheckpoint records a checkpoint update.
RecordCheckpoint(projectionName string, position uint64)
// RecordError records a projection error.
RecordError(projectionName string, err error)
}
ProjectionMetrics collects metrics about projection processing.
type ProjectionRebuilder ¶
type ProjectionRebuilder struct {
// contains filtered or unexported fields
}
ProjectionRebuilder rebuilds projections from scratch. It replays all events through a projection to reconstruct its read model.
func NewProjectionRebuilder ¶
func NewProjectionRebuilder(store *EventStore, checkpointStore CheckpointStore, opts ...ProjectionRebuilderOption) *ProjectionRebuilder
NewProjectionRebuilder creates a new projection rebuilder.
func (*ProjectionRebuilder) RebuildAsync ¶
func (r *ProjectionRebuilder) RebuildAsync(ctx context.Context, projection AsyncProjection, opts ...RebuildOptions) error
RebuildAsync rebuilds an async projection from scratch.
func (*ProjectionRebuilder) RebuildInline ¶
func (r *ProjectionRebuilder) RebuildInline(ctx context.Context, projection InlineProjection, opts ...RebuildOptions) error
RebuildInline rebuilds an inline projection from scratch.
type ProjectionRebuilderOption ¶
type ProjectionRebuilderOption func(*ProjectionRebuilder)
ProjectionRebuilderOption configures a ProjectionRebuilder.
func WithRebuilderBatchSize ¶
func WithRebuilderBatchSize(size int) ProjectionRebuilderOption
WithRebuilderBatchSize sets the batch size for rebuilding.
func WithRebuilderLogger ¶
func WithRebuilderLogger(logger Logger) ProjectionRebuilderOption
WithRebuilderLogger sets the logger for the rebuilder.
func WithRebuilderMetrics ¶
func WithRebuilderMetrics(metrics ProjectionMetrics) ProjectionRebuilderOption
WithRebuilderMetrics sets the metrics collector for the rebuilder.
type ProjectionState ¶
type ProjectionState string
ProjectionState represents the current state of a projection.
const ( // ProjectionStateStopped indicates the projection is not running. ProjectionStateStopped ProjectionState = "stopped" // ProjectionStateRunning indicates the projection is actively processing events. ProjectionStateRunning ProjectionState = "running" // ProjectionStatePaused indicates the projection is paused. ProjectionStatePaused ProjectionState = "paused" // ProjectionStateFaulted indicates the projection has encountered an error. ProjectionStateFaulted ProjectionState = "faulted" // ProjectionStateRebuilding indicates the projection is being rebuilt. ProjectionStateRebuilding ProjectionState = "rebuilding" // ProjectionStateCatchingUp indicates the projection is catching up to current events. ProjectionStateCatchingUp ProjectionState = "catching_up" )
type ProjectionStatus ¶
type ProjectionStatus struct {
// Name is the projection name.
Name string
// State is the current state of the projection.
State ProjectionState
// LastPosition is the global position of the last processed event.
LastPosition uint64
// EventsProcessed is the total number of events processed.
EventsProcessed uint64
// LastProcessedAt is when the last event was processed.
LastProcessedAt time.Time
// Error contains the error message if the projection is faulted.
Error string
// Lag is the number of events behind the head of the event store.
Lag uint64
// AverageLatency is the average time to process an event.
AverageLatency time.Duration
}
ProjectionStatus provides detailed information about a projection's current state.
type Publisher ¶
type Publisher interface {
// Publish sends one or more messages to the external system.
Publish(ctx context.Context, messages []*OutboxMessage) error
// Destination returns the destination prefix this publisher handles (e.g., "webhook", "kafka", "sns").
Destination() string
}
Publisher publishes outbox messages to an external system.
type Query ¶
type Query struct {
// Filters to apply.
Filters []Filter
// Ordering criteria.
OrderBy []OrderBy
// Maximum number of results to return.
// 0 means no limit.
Limit int
// Number of results to skip.
Offset int
// IncludeCount includes the total count in paginated results.
IncludeCount bool
}
Query represents a query for read models.
func (*Query) OrderByAsc ¶
OrderByAsc adds ascending order.
func (*Query) OrderByDesc ¶
OrderByDesc adds descending order.
func (*Query) WithOffset ¶
WithOffset sets the number of results to skip.
func (*Query) WithPagination ¶
WithPagination sets limit and offset for pagination.
type QueryResult ¶
type QueryResult[T any] struct { // Items contains the matching read models. Items []*T // TotalCount is the total number of matching items (before pagination). // Only populated if IncludeCount was true in the query. TotalCount int64 // HasMore indicates if there are more results beyond the limit. HasMore bool }
QueryResult contains query results with optional count.
type ReadModelID ¶
type ReadModelID interface {
// GetID returns the read model's unique identifier.
GetID() string
}
ReadModelID is an interface for read models that can return their ID.
type ReadModelRepository ¶
type ReadModelRepository[T any] interface { // Get retrieves a read model by ID. // Returns ErrNotFound if not found. Get(ctx context.Context, id string) (*T, error) // GetMany retrieves multiple read models by their IDs. // Missing IDs are silently skipped. GetMany(ctx context.Context, ids []string) ([]*T, error) // Find queries read models with the given criteria. Find(ctx context.Context, query Query) ([]*T, error) // FindOne returns the first read model matching the query. // Returns ErrNotFound if no match. FindOne(ctx context.Context, query Query) (*T, error) // Count returns the number of read models matching the query. Count(ctx context.Context, query Query) (int64, error) // Insert creates a new read model. // Returns ErrAlreadyExists if ID already exists. Insert(ctx context.Context, model *T) error // Update modifies an existing read model. // Returns ErrNotFound if not found. Update(ctx context.Context, id string, updateFn func(*T)) error // Upsert creates or updates a read model. Upsert(ctx context.Context, model *T) error // Delete removes a read model by ID. // Returns ErrNotFound if not found. Delete(ctx context.Context, id string) error // DeleteMany removes all read models matching the query. // Returns the number of deleted models. DeleteMany(ctx context.Context, query Query) (int64, error) // Clear removes all read models. Clear(ctx context.Context) error }
ReadModelRepository provides generic CRUD operations for read models. T is the read model type.
type RebuildOptions ¶
type RebuildOptions struct {
// DeleteCheckpoint deletes the existing checkpoint before rebuilding.
// Default: true
DeleteCheckpoint bool
// ClearReadModel calls the projection's Clear method before rebuilding.
// Only applicable for projections that implement Clearable.
// Default: true
ClearReadModel bool
// ProgressCallback is called periodically with progress updates.
ProgressCallback ProgressCallback
// ProgressInterval is how often to call the progress callback.
// Default: 1 second
ProgressInterval time.Duration
// FromPosition starts rebuilding from a specific position.
// Default: 0 (from beginning)
FromPosition uint64
// ToPosition stops rebuilding at a specific position.
// Default: 0 (to end)
ToPosition uint64
}
RebuildOptions configures a projection rebuild.
func DefaultRebuildOptions ¶
func DefaultRebuildOptions() RebuildOptions
DefaultRebuildOptions returns the default rebuild options.
type RebuildProgress ¶
type RebuildProgress struct {
// ProjectionName is the name of the projection being rebuilt.
ProjectionName string
// TotalEvents is the total number of events to process.
TotalEvents uint64
// ProcessedEvents is the number of events processed so far.
ProcessedEvents uint64
// CurrentPosition is the current global position.
CurrentPosition uint64
// StartedAt is when the rebuild started.
StartedAt time.Time
// Duration is the elapsed time.
Duration time.Duration
// EventsPerSecond is the processing rate.
EventsPerSecond float64
// EstimatedRemaining is the estimated time remaining.
EstimatedRemaining time.Duration
// Completed indicates if the rebuild is complete.
Completed bool
// Error contains any error that occurred.
Error error
}
RebuildProgress tracks the progress of a projection rebuild.
type RetryConfig ¶
type RetryConfig struct {
// MaxAttempts is the maximum number of attempts (including the first one).
MaxAttempts int
// InitialDelay is the initial delay between retries.
InitialDelay time.Duration
// MaxDelay is the maximum delay between retries.
MaxDelay time.Duration
// Multiplier is the factor by which the delay increases on each retry.
Multiplier float64
// ShouldRetry determines if an error should be retried.
// If nil, all errors are retried.
ShouldRetry func(err error) bool
}
RetryMiddleware retries failed commands.
func DefaultRetryConfig ¶
func DefaultRetryConfig() RetryConfig
DefaultRetryConfig returns a default retry configuration.
type RetryPolicy ¶
type RetryPolicy interface {
// ShouldRetry returns true if the operation should be retried.
ShouldRetry(attempt int, err error) bool
// Delay returns the duration to wait before the next retry.
Delay(attempt int) time.Duration
}
RetryPolicy defines how to handle retries for failed operations.
func ExponentialBackoffRetry ¶
func ExponentialBackoffRetry(maxRetries int, baseDelay, maxDelay time.Duration) RetryPolicy
ExponentialBackoffRetry creates a new retry policy with exponential backoff.
type Saga ¶
type Saga interface {
// SagaID returns the unique identifier for this saga instance.
SagaID() string
// SagaType returns the type of this saga (e.g., "OrderFulfillment").
SagaType() string
// Status returns the current status of the saga.
Status() SagaStatus
// SetStatus sets the saga status.
SetStatus(status SagaStatus)
// CurrentStep returns the current step number (0-based).
CurrentStep() int
// SetCurrentStep sets the current step number.
SetCurrentStep(step int)
// CorrelationID returns the correlation ID for this saga.
// Used to correlate events to this saga instance.
CorrelationID() string
// SetCorrelationID sets the correlation ID.
SetCorrelationID(id string)
// HandledEvents returns the list of event types this saga handles.
HandledEvents() []string
// HandleEvent processes an event and returns commands to dispatch.
// The returned commands will be executed by the saga manager.
HandleEvent(ctx context.Context, event StoredEvent) ([]Command, error)
// Compensate is called when the saga needs to rollback.
// It returns compensating commands to undo previous steps.
Compensate(ctx context.Context, failedStep int, failureReason error) ([]Command, error)
// IsComplete returns true if the saga has completed successfully.
IsComplete() bool
// StartedAt returns when the saga started.
StartedAt() time.Time
// SetStartedAt sets when the saga started.
SetStartedAt(t time.Time)
// CompletedAt returns when the saga completed (nil if not completed).
CompletedAt() *time.Time
// SetCompletedAt sets when the saga completed.
SetCompletedAt(t *time.Time)
// Data returns the saga's internal state as a map.
// This is serialized and stored in the saga store.
Data() map[string]interface{}
// SetData restores the saga's internal state from a map.
SetData(data map[string]interface{})
// Version returns the saga version for optimistic concurrency.
Version() int64
// SetVersion sets the saga version.
SetVersion(v int64)
// IncrementVersion increments the saga version.
IncrementVersion()
}
Saga defines the interface for saga implementations. A saga coordinates long-running business processes across multiple aggregates.
type SagaBase ¶
type SagaBase struct {
// contains filtered or unexported fields
}
SagaBase provides a default partial implementation of the Saga interface. Embed this struct in your saga types to get default behavior.
func NewSagaBase ¶
NewSagaBase creates a new SagaBase with the given ID and type.
func (*SagaBase) CompletedAt ¶
CompletedAt returns when the saga completed.
func (*SagaBase) CorrelationID ¶
CorrelationID returns the correlation ID.
func (*SagaBase) CurrentStep ¶
CurrentStep returns the current step number.
func (*SagaBase) IncrementVersion ¶
func (s *SagaBase) IncrementVersion()
IncrementVersion increments the saga version.
func (*SagaBase) MarkCompensated ¶
func (s *SagaBase) MarkCompensated()
MarkCompensated marks the saga as compensated.
func (*SagaBase) SetCompletedAt ¶
SetCompletedAt sets when the saga completed.
func (*SagaBase) SetCorrelationID ¶
SetCorrelationID sets the correlation ID.
func (*SagaBase) SetCurrentStep ¶
SetCurrentStep sets the current step number.
func (*SagaBase) SetStartedAt ¶
SetStartedAt sets when the saga started.
func (*SagaBase) SetStatus ¶
func (s *SagaBase) SetStatus(status SagaStatus)
SetStatus sets the saga status.
func (*SagaBase) SetVersion ¶
SetVersion sets the saga version.
func (*SagaBase) StartCompensation ¶
func (s *SagaBase) StartCompensation()
StartCompensation marks the saga as compensating.
type SagaCorrelation ¶
type SagaCorrelation struct {
// SagaType is the type of saga this correlation applies to.
SagaType string
// EventTypes are the event types that can start this saga.
StartingEvents []string
// CorrelationIDFunc extracts the correlation ID from an event.
// This is used to find existing sagas or create new ones.
CorrelationIDFunc func(event StoredEvent) string
}
SagaCorrelation provides strategies for correlating events to sagas.
type SagaFailedError ¶
type SagaFailedError struct {
SagaID string
SagaType string
FailedStep int
Reason string
Recoverable bool
}
SagaFailedError provides detailed information about a saga failure.
func NewSagaFailedError ¶
func NewSagaFailedError(sagaID, sagaType string, failedStep int, reason string, recoverable bool) *SagaFailedError
NewSagaFailedError creates a new SagaFailedError.
func (*SagaFailedError) Error ¶
func (e *SagaFailedError) Error() string
Error returns the error message.
func (*SagaFailedError) Is ¶
func (e *SagaFailedError) Is(target error) bool
Is reports whether this error matches the target error.
type SagaManager ¶
type SagaManager struct {
// contains filtered or unexported fields
}
SagaManager orchestrates saga lifecycle and event processing. It subscribes to events, routes them to appropriate sagas, and dispatches resulting commands.
Concurrency and Idempotency ¶
SagaManager provides several mechanisms to ensure correct saga processing under concurrent access:
Per-Saga Locking: Each saga ID has an associated mutex that serializes access. This prevents race conditions when the same event is delivered from multiple sources (e.g., pg_notify + polling) or when multiple events for the same saga arrive simultaneously.
Fresh State Loading: Before processing each event, the saga state is loaded fresh from the store. This ensures terminal status checks and idempotency checks see the latest state.
Event Idempotency: Processed events are tracked in the SagaState.ProcessedEvents field (not in the saga's Data map) to prevent duplicate processing on retries. This is handled transparently by the SagaManager - saga implementations don't need to preserve these internal tracking fields.
Optimistic Concurrency: The saga store uses version-based optimistic concurrency control. On conflict, the event is retried with fresh state.
func NewSagaManager ¶
func NewSagaManager(eventStore *EventStore, opts ...SagaManagerOption) *SagaManager
NewSagaManager creates a new SagaManager.
func (*SagaManager) FindSagaByCorrelationID ¶
func (m *SagaManager) FindSagaByCorrelationID(ctx context.Context, correlationID string) (*SagaState, error)
FindSagaByCorrelationID finds a saga by its correlation ID.
func (*SagaManager) IsRunning ¶
func (m *SagaManager) IsRunning() bool
IsRunning returns true if the saga manager is running.
func (*SagaManager) Position ¶
func (m *SagaManager) Position() uint64
Position returns the current event position.
func (*SagaManager) ProcessEvent ¶
func (m *SagaManager) ProcessEvent(ctx context.Context, event StoredEvent) error
ProcessEvent manually processes a single event (for testing or manual replay).
func (*SagaManager) Register ¶
func (m *SagaManager) Register(sagaType string, factory SagaFactory, correlation SagaCorrelation)
Register registers a saga type with its factory and correlation configuration.
func (*SagaManager) RegisterSimple ¶
func (m *SagaManager) RegisterSimple(sagaType string, factory SagaFactory, startingEvents ...string)
RegisterSimple registers a saga with a simple correlation based on event stream ID.
func (*SagaManager) SetPosition ¶
func (m *SagaManager) SetPosition(pos uint64)
SetPosition sets the starting position for event processing.
func (*SagaManager) Start ¶
func (m *SagaManager) Start(ctx context.Context) error
Start begins processing events and routing them to sagas. This method blocks until the context is cancelled.
func (*SagaManager) StartAsync ¶
func (m *SagaManager) StartAsync(ctx context.Context) *AsyncResult
StartAsync begins processing events and routing them to sagas in a background goroutine. It returns immediately with an AsyncResult that can be used to:
- Wait for the saga manager to stop: result.Wait()
- Wait with timeout: result.WaitWithTimeout(5 * time.Second)
- Check if stopped: result.IsComplete()
- Cancel the manager: result.Cancel()
- Get the error: result.Err()
The saga manager will continue processing until:
- The provided context is cancelled
- result.Cancel() is called
- An unrecoverable error occurs
Example:
result := manager.StartAsync(ctx)
// Do other work while saga manager runs in background...
// Later, when shutting down:
result.Cancel()
if err := result.WaitWithTimeout(10 * time.Second); err != nil {
log.Printf("Saga manager shutdown: %v", err)
}
func (*SagaManager) StartSaga ¶
func (m *SagaManager) StartSaga(ctx context.Context, sagaType string, triggerEvent StoredEvent) error
StartSaga manually triggers a new saga instance synchronously. The saga will be created and the trigger event will be processed immediately.
This is useful when you want to start a saga based on an external trigger rather than waiting for an event to flow through the event store subscription.
func (*SagaManager) StartSagaAsync ¶
func (m *SagaManager) StartSagaAsync(ctx context.Context, sagaType string, triggerEvent StoredEvent) *AsyncResult
StartSagaAsync manually triggers a new saga instance asynchronously. The saga will be started and the first event will be processed in a background goroutine. Returns an AsyncResult that can be used to wait for the initial processing to complete.
This is useful when you want to start a saga based on an external trigger rather than waiting for an event to flow through the event store subscription.
Example:
result := manager.StartSagaAsync(ctx, "OrderFulfillment", initialEvent)
if err := result.WaitWithTimeout(5 * time.Second); err != nil {
log.Printf("Failed to start saga: %v", err)
}
type SagaManagerOption ¶
type SagaManagerOption func(*SagaManager)
SagaManagerOption configures a SagaManager.
func WithCommandBus ¶
func WithCommandBus(bus *CommandBus) SagaManagerOption
WithCommandBus sets the command bus for dispatching commands.
func WithSagaLogger ¶
func WithSagaLogger(logger Logger) SagaManagerOption
WithSagaLogger sets the logger.
func WithSagaPollInterval ¶
func WithSagaPollInterval(d time.Duration) SagaManagerOption
WithSagaPollInterval sets the polling interval for event subscription.
func WithSagaRetryAttempts ¶
func WithSagaRetryAttempts(attempts int) SagaManagerOption
WithSagaRetryAttempts sets the number of retry attempts for failed commands.
func WithSagaRetryDelay ¶
func WithSagaRetryDelay(d time.Duration) SagaManagerOption
WithSagaRetryDelay sets the delay between retry attempts.
func WithSagaSerializer ¶
func WithSagaSerializer(serializer Serializer) SagaManagerOption
WithSagaSerializer sets the serializer for saga data.
func WithSagaStore ¶
func WithSagaStore(store SagaStore) SagaManagerOption
WithSagaStore sets the saga store.
type SagaNotFoundError ¶
type SagaNotFoundError = adapters.SagaNotFoundError
SagaNotFoundError provides detailed information about a missing saga. This is a type alias to adapters.SagaNotFoundError for consistency.
type SagaState ¶
SagaState represents the persisted state of a saga.
func SagaStateFromJSON ¶
SagaStateFromJSON parses saga state from JSON.
type SagaStatus ¶
type SagaStatus = adapters.SagaStatus
SagaStatus represents the current status of a saga.
type SagaStepStatus ¶
type SagaStepStatus = adapters.SagaStepStatus
SagaStepStatus represents the status of a saga step.
type SchemaCompatibility ¶
type SchemaCompatibility int
SchemaCompatibility represents the level of backward compatibility between schema versions.
const ( // SchemaFullyCompatible indicates the schema change is fully backward and forward compatible. // No fields were added, removed, or changed — only documentation or ordering changes. SchemaFullyCompatible SchemaCompatibility = iota // SchemaBackwardCompatible indicates old data can be read by new code. // Fields may have been added (with defaults) but none removed or changed. SchemaBackwardCompatible // SchemaForwardCompatible indicates new data can be read by old code. // Fields may have been removed but none added or changed. SchemaForwardCompatible // SchemaBreaking indicates the schema change breaks compatibility. // Fields were changed, renamed, or removed without migration support. SchemaBreaking )
func (SchemaCompatibility) String ¶
func (c SchemaCompatibility) String() string
String returns a human-readable name for the compatibility level.
type SchemaDefinition ¶
type SchemaDefinition struct {
// Version is the schema version number.
Version int
// Fields describes the fields in this schema version.
Fields []FieldDefinition
// JSONSchema is an optional JSON Schema document for validation.
JSONSchema json.RawMessage
// RegisteredAt is when this schema version was registered.
RegisteredAt time.Time
}
SchemaDefinition describes the schema for a specific version of an event type.
type SchemaRegistry ¶
type SchemaRegistry struct {
// contains filtered or unexported fields
}
SchemaRegistry is an in-memory registry that tracks event schemas and their versions. It provides compatibility checking between schema versions.
func NewSchemaRegistry ¶
func NewSchemaRegistry() *SchemaRegistry
NewSchemaRegistry creates a new empty SchemaRegistry.
func (*SchemaRegistry) CheckCompatibility ¶
func (r *SchemaRegistry) CheckCompatibility(eventType string, oldVersion, newVersion int) (SchemaCompatibility, error)
CheckCompatibility determines the compatibility level between two schema versions. It compares the field definitions to classify the change.
func (*SchemaRegistry) GetLatestVersion ¶
func (r *SchemaRegistry) GetLatestVersion(eventType string) (int, error)
GetLatestVersion returns the highest registered schema version for an event type.
func (*SchemaRegistry) GetSchema ¶
func (r *SchemaRegistry) GetSchema(eventType string, version int) (*SchemaDefinition, error)
GetSchema retrieves a specific schema version for an event type.
func (*SchemaRegistry) Register ¶
func (r *SchemaRegistry) Register(eventType string, schema SchemaDefinition) error
Register adds a schema definition for an event type. Returns an error if the version is < 1 or already registered.
func (*SchemaRegistry) RegisteredEventTypes ¶
func (r *SchemaRegistry) RegisteredEventTypes() []string
RegisteredEventTypes returns a sorted list of event types that have schemas registered.
type SchemaVersionGapError ¶
SchemaVersionGapError provides detailed information about a gap in the upcaster chain.
func NewSchemaVersionGapError ¶
func NewSchemaVersionGapError(eventType string, missingVersion, expectedVersion int) *SchemaVersionGapError
NewSchemaVersionGapError creates a new SchemaVersionGapError.
func (*SchemaVersionGapError) Error ¶
func (e *SchemaVersionGapError) Error() string
Error returns the error message.
func (*SchemaVersionGapError) Is ¶
func (e *SchemaVersionGapError) Is(target error) bool
Is reports whether this error matches the target error.
func (*SchemaVersionGapError) Unwrap ¶
func (e *SchemaVersionGapError) Unwrap() error
Unwrap returns the underlying error for errors.Unwrap().
type SerializationError ¶
type SerializationError struct {
EventType string
Operation string // "serialize" or "deserialize"
Cause error
}
SerializationError provides detailed information about a serialization failure.
func NewSerializationError ¶
func NewSerializationError(eventType, operation string, cause error) *SerializationError
NewSerializationError creates a new SerializationError.
func (*SerializationError) Error ¶
func (e *SerializationError) Error() string
Error returns the error message.
func (*SerializationError) Is ¶
func (e *SerializationError) Is(target error) bool
Is reports whether this error matches the target error.
func (*SerializationError) Unwrap ¶
func (e *SerializationError) Unwrap() error
Unwrap returns the underlying cause for errors.Unwrap().
type Serializer ¶
type Serializer interface {
// Serialize converts an event to bytes.
Serialize(event interface{}) ([]byte, error)
// Deserialize converts bytes back to an event.
// The eventType is used to determine the target type.
Deserialize(data []byte, eventType string) (interface{}, error)
}
Serializer handles event payload serialization and deserialization.
type SimpleDispatcher ¶
type SimpleDispatcher struct {
// contains filtered or unexported fields
}
SimpleDispatcher is a basic dispatcher that forwards commands to handlers.
func NewSimpleDispatcher ¶
func NewSimpleDispatcher(registry *HandlerRegistry) *SimpleDispatcher
NewSimpleDispatcher creates a new SimpleDispatcher.
func (*SimpleDispatcher) Dispatch ¶
func (d *SimpleDispatcher) Dispatch(ctx context.Context, cmd Command) (CommandResult, error)
Dispatch sends a command to its handler.
type StoredEvent ¶
type StoredEvent struct {
// ID is the globally unique event identifier.
ID string
// StreamID identifies the stream this event belongs to.
StreamID string
// Type is the event type identifier.
Type string
// Data is the serialized event payload.
Data []byte
// Metadata contains contextual information.
Metadata Metadata
// Version is the position within the stream (1-based).
Version int64
// GlobalPosition is the position across all streams.
GlobalPosition uint64
// Timestamp is when the event was stored.
Timestamp time.Time
}
StoredEvent represents a persisted event with all storage metadata.
type StreamID ¶
type StreamID struct {
// Category represents the aggregate type (e.g., "Order", "Customer").
Category string
// ID is the unique identifier within the category (e.g., "order-123").
ID string
}
StreamID uniquely identifies an event stream. It consists of a category (aggregate type) and an instance ID.
func NewStreamID ¶
NewStreamID creates a new StreamID from category and ID.
func ParseStreamID ¶
ParseStreamID parses a stream ID string in the format "Category-ID". Returns an error if the format is invalid.
type StreamInfo ¶
type StreamInfo struct {
// StreamID is the stream identifier.
StreamID string
// Category is the stream category (aggregate type).
Category string
// Version is the current stream version.
Version int64
// EventCount is the number of events in the stream.
EventCount int64
// CreatedAt is when the stream was created.
CreatedAt time.Time
// UpdatedAt is when the stream was last modified.
UpdatedAt time.Time
}
StreamInfo contains metadata about an event stream.
type StreamNotFoundError ¶
type StreamNotFoundError struct {
StreamID string
}
StreamNotFoundError provides detailed information about a missing stream.
func NewStreamNotFoundError ¶
func NewStreamNotFoundError(streamID string) *StreamNotFoundError
NewStreamNotFoundError creates a new StreamNotFoundError.
func (*StreamNotFoundError) Error ¶
func (e *StreamNotFoundError) Error() string
Error returns the error message.
func (*StreamNotFoundError) Is ¶
func (e *StreamNotFoundError) Is(target error) bool
Is reports whether this error matches the target error.
func (*StreamNotFoundError) Unwrap ¶
func (e *StreamNotFoundError) Unwrap() error
Unwrap returns the underlying error for errors.Unwrap().
type Subscription ¶
type Subscription interface {
// Events returns the channel for receiving events.
Events() <-chan StoredEvent
// Close stops the subscription.
Close() error
// Err returns any error that caused the subscription to close.
Err() error
}
Subscription represents an active event subscription.
type SubscriptionAdapter ¶
type SubscriptionAdapter interface {
// LoadFromPosition loads events starting from a global position.
LoadFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]StoredEvent, error)
// SubscribeAll subscribes to all events across all streams.
SubscribeAll(ctx context.Context, fromPosition uint64) (<-chan StoredEvent, error)
// SubscribeStream subscribes to events from a specific stream.
SubscribeStream(ctx context.Context, streamID string, fromVersion int64) (<-chan StoredEvent, error)
// SubscribeCategory subscribes to all events from streams in a category.
SubscribeCategory(ctx context.Context, category string, fromPosition uint64) (<-chan StoredEvent, error)
}
SubscriptionAdapter provides methods for subscribing to event streams. This interface extends the basic EventStoreAdapter for subscription capabilities.
type SubscriptionOptions ¶
type SubscriptionOptions struct {
// BufferSize is the size of the event channel buffer.
// Default: 256
BufferSize int
// Filter optionally filters which events are delivered.
Filter EventFilter
// RetryOnError determines whether to retry on transient errors.
// Default: true
RetryOnError bool
// RetryInterval is the time to wait between retries.
// Default: 1 second
RetryInterval time.Duration
// MaxRetries is the maximum number of retry attempts.
// Default: 5
MaxRetries int
}
SubscriptionOptions configures a subscription.
func DefaultSubscriptionOptions ¶
func DefaultSubscriptionOptions() SubscriptionOptions
DefaultSubscriptionOptions returns the default subscription options.
type UpcastError ¶
UpcastError provides detailed information about an upcasting failure.
func NewUpcastError ¶
func NewUpcastError(eventType string, fromVersion, toVersion int, cause error) *UpcastError
NewUpcastError creates a new UpcastError.
func (*UpcastError) Is ¶
func (e *UpcastError) Is(target error) bool
Is reports whether this error matches the target error.
func (*UpcastError) Unwrap ¶
func (e *UpcastError) Unwrap() error
Unwrap returns the underlying cause for errors.Unwrap().
type Upcaster ¶
type Upcaster interface {
// EventType returns the event type this upcaster handles.
EventType() string
// FromVersion returns the source schema version.
FromVersion() int
// ToVersion returns the target schema version. Must equal FromVersion() + 1.
ToVersion() int
// Upcast transforms event data from FromVersion to ToVersion.
// Metadata is provided as read-only context (e.g., for tenant-specific defaults).
Upcast(data []byte, metadata Metadata) ([]byte, error)
}
Upcaster transforms event data from one schema version to the next. Upcasters operate on raw bytes and are serializer-agnostic. Each upcaster handles exactly one version transition (FromVersion → ToVersion).
type UpcasterChain ¶
type UpcasterChain struct {
// contains filtered or unexported fields
}
UpcasterChain is a thread-safe registry of upcasters that applies them in sequence. It validates that there are no gaps or duplicates in the version chain.
func NewUpcasterChain ¶
func NewUpcasterChain() *UpcasterChain
NewUpcasterChain creates a new empty UpcasterChain.
func (*UpcasterChain) HasUpcasters ¶
func (c *UpcasterChain) HasUpcasters(eventType string) bool
HasUpcasters reports whether any upcasters are registered for the given event type.
func (*UpcasterChain) LatestVersion ¶
func (c *UpcasterChain) LatestVersion(eventType string) int
LatestVersion returns the latest schema version for the given event type. Returns DefaultSchemaVersion if no upcasters are registered for the type.
func (*UpcasterChain) Register ¶
func (c *UpcasterChain) Register(u Upcaster) error
Register adds an upcaster to the chain. Returns an error if the upcaster's ToVersion != FromVersion + 1 or if an upcaster for the same event type and version transition already exists.
func (*UpcasterChain) RegisteredEventTypes ¶
func (c *UpcasterChain) RegisteredEventTypes() []string
RegisteredEventTypes returns a sorted list of event types that have upcasters registered.
func (*UpcasterChain) Upcast ¶
func (c *UpcasterChain) Upcast(eventType string, fromVersion int, data []byte, metadata Metadata) ([]byte, int, error)
Upcast transforms event data from fromVersion to the latest version. Returns the transformed data, the final version, and any error. If no upcasters exist for the event type or the data is already at the latest version, the original data is returned unchanged.
func (*UpcasterChain) Validate ¶
func (c *UpcasterChain) Validate() error
Validate checks the entire chain for gaps. For each event type, the upcasters must form a contiguous chain from the lowest FromVersion to the highest ToVersion.
type UpcastingSerializer ¶
type UpcastingSerializer struct {
// contains filtered or unexported fields
}
UpcastingSerializer is a decorator that wraps any Serializer and applies upcasting during deserialization. Serialization passes through unchanged.
func NewUpcastingSerializer ¶
func NewUpcastingSerializer(inner Serializer, chain *UpcasterChain) *UpcastingSerializer
NewUpcastingSerializer creates a new UpcastingSerializer wrapping the given serializer.
func (*UpcastingSerializer) Chain ¶
func (s *UpcastingSerializer) Chain() *UpcasterChain
Chain returns the upcaster chain.
func (*UpcastingSerializer) Deserialize ¶
func (s *UpcastingSerializer) Deserialize(data []byte, eventType string) (interface{}, error)
Deserialize converts bytes back to an event. If upcasters are registered for the event type, the data is upcasted from DefaultSchemaVersion before deserialization.
func (*UpcastingSerializer) DeserializeWithVersion ¶
func (s *UpcastingSerializer) DeserializeWithVersion(data []byte, eventType string, schemaVersion int, metadata Metadata) (interface{}, error)
DeserializeWithVersion converts bytes back to an event, upcasting from the specified schema version. Metadata is provided as read-only context to upcasters.
func (*UpcastingSerializer) Inner ¶
func (s *UpcastingSerializer) Inner() Serializer
Inner returns the wrapped serializer.
func (*UpcastingSerializer) Serialize ¶
func (s *UpcastingSerializer) Serialize(event interface{}) ([]byte, error)
Serialize converts an event to bytes. Pass-through to the inner serializer.
type ValidationError ¶
type ValidationError struct {
// CommandType is the type of command that failed validation.
CommandType string
// Field is the field that failed validation (optional).
Field string
// Message describes the validation failure.
Message string
// Cause is the underlying error (optional).
Cause error
}
ValidationError represents a command validation failure.
func NewValidationError ¶
func NewValidationError(cmdType, field, message string) *ValidationError
NewValidationError creates a new ValidationError.
func NewValidationErrorWithCause ¶
func NewValidationErrorWithCause(cmdType, field, message string, cause error) *ValidationError
NewValidationErrorWithCause creates a new ValidationError with an underlying cause.
func (*ValidationError) Error ¶
func (e *ValidationError) Error() string
Error returns the error message.
func (*ValidationError) Is ¶
func (e *ValidationError) Is(target error) bool
Is reports whether this error matches the target error.
func (*ValidationError) Unwrap ¶
func (e *ValidationError) Unwrap() error
Unwrap returns the underlying cause for errors.Unwrap().
type Validator ¶
type Validator interface {
// Validate validates a command and returns validation errors.
Validate(cmd Command) error
}
Validator provides command validation functionality.
type ValidatorFunc ¶
ValidatorFunc is a function that implements Validator.
func (ValidatorFunc) Validate ¶
func (f ValidatorFunc) Validate(cmd Command) error
Validate implements Validator.
type VersionSetter ¶
type VersionSetter interface {
SetVersion(v int64)
}
VersionSetter is an optional interface that aggregates can implement to allow the EventStore to set their version during loading. This is used for optimistic concurrency control in SaveAggregate. AggregateBase implements this interface.
type VersionedAggregate ¶
type VersionedAggregate interface {
Aggregate
// OriginalVersion returns the version when the aggregate was loaded.
OriginalVersion() int64
}
VersionedAggregate provides versioning information for optimistic concurrency.
Source Files
¶
- aggregate.go
- bus.go
- command.go
- encryption.go
- encryption_errors.go
- errors.go
- event.go
- export.go
- export_errors.go
- handler.go
- idempotency.go
- middleware.go
- mink.go
- outbox.go
- outbox_processor.go
- projection.go
- projection_engine.go
- rebuilder.go
- repository.go
- saga.go
- saga_manager.go
- schema_registry.go
- serializer.go
- store.go
- subscription.go
- upcasting_serializer.go
- versioning.go
- versioning_errors.go
Directories
¶
| Path | Synopsis |
|---|---|
|
Package adapters provides interfaces for event store backends.
|
Package adapters provides interfaces for event store backends. |
|
memory
Package memory provides an in-memory implementation of the event store adapter.
|
Package memory provides an in-memory implementation of the event store adapter. |
|
postgres
Package postgres provides a PostgreSQL implementation of the event store adapter.
|
Package postgres provides a PostgreSQL implementation of the event store adapter. |
|
cli
|
|
|
commands
Package commands provides the CLI command implementations for mink.
|
Package commands provides the CLI command implementations for mink. |
|
config
Package config provides configuration management for the mink CLI.
|
Package config provides configuration management for the mink CLI. |
|
styles
Package styles provides consistent styling for the go-mink CLI.
|
Package styles provides consistent styling for the go-mink CLI. |
|
ui
Package ui provides reusable UI components for the go-mink CLI.
|
Package ui provides reusable UI components for the go-mink CLI. |
|
cmd
|
|
|
mink
command
mink is the command-line interface for the go-mink event sourcing library.
|
mink is the command-line interface for the go-mink event sourcing library. |
|
Package encryption defines interfaces and types for field-level encryption in event sourcing.
|
Package encryption defines interfaces and types for field-level encryption in event sourcing. |
|
kms
Package kms provides an AWS KMS encryption provider for field-level encryption.
|
Package kms provides an AWS KMS encryption provider for field-level encryption. |
|
local
Package local provides an in-memory AES-256-GCM encryption provider for testing.
|
Package local provides an in-memory AES-256-GCM encryption provider for testing. |
|
providertest
Package providertest provides shared test helpers for encryption.Provider implementations.
|
Package providertest provides shared test helpers for encryption.Provider implementations. |
|
vault
Package vault provides a HashiCorp Vault Transit encryption provider for field-level encryption.
|
Package vault provides a HashiCorp Vault Transit encryption provider for field-level encryption. |
|
examples
|
|
|
basic
command
Package main demonstrates the basic usage of go-mink for event sourcing.
|
Package main demonstrates the basic usage of go-mink for event sourcing. |
|
cqrs
command
Package main demonstrates the CQRS and Command Bus features of go-mink (Phase 2).
|
Package main demonstrates the CQRS and Command Bus features of go-mink (Phase 2). |
|
cqrs-postgres
command
Package main demonstrates Phase 2 CQRS & Command Bus features with PostgreSQL.
|
Package main demonstrates Phase 2 CQRS & Command Bus features with PostgreSQL. |
|
encryption
command
Package main demonstrates field-level encryption in go-mink.
|
Package main demonstrates field-level encryption in go-mink. |
|
export
command
Package main demonstrates GDPR data export (right to access / data portability) in go-mink.
|
Package main demonstrates GDPR data export (right to access / data portability) in go-mink. |
|
full-ecommerce
command
Package main demonstrates a complete e-commerce order fulfillment system using go-mink.
|
Package main demonstrates a complete e-commerce order fulfillment system using go-mink. |
|
metrics
command
Example: Metrics Middleware
|
Example: Metrics Middleware |
|
msgpack
command
Example: MessagePack Serializer
|
Example: MessagePack Serializer |
|
projections
command
Package main demonstrates the projection and read model features of go-mink.
|
Package main demonstrates the projection and read model features of go-mink. |
|
protobuf
command
Example: Protocol Buffers Serializer
|
Example: Protocol Buffers Serializer |
|
sagas
command
Example: Saga (Process Manager) Pattern
|
Example: Saga (Process Manager) Pattern |
|
tracing
command
Example: Distributed Tracing with OpenTelemetry
|
Example: Distributed Tracing with OpenTelemetry |
|
versioning
command
Package main demonstrates event versioning and upcasting in go-mink.
|
Package main demonstrates event versioning and upcasting in go-mink. |
|
middleware
|
|
|
metrics
Package metrics provides Prometheus metrics integration for mink.
|
Package metrics provides Prometheus metrics integration for mink. |
|
tracing
Package tracing provides OpenTelemetry integration for mink.
|
Package tracing provides OpenTelemetry integration for mink. |
|
outbox
|
|
|
kafka
Package kafka provides a Kafka publisher for the outbox pattern.
|
Package kafka provides a Kafka publisher for the outbox pattern. |
|
sns
Package sns provides an AWS SNS publisher for the outbox pattern.
|
Package sns provides an AWS SNS publisher for the outbox pattern. |
|
webhook
Package webhook provides a webhook publisher for the outbox pattern.
|
Package webhook provides a webhook publisher for the outbox pattern. |
|
serializer
|
|
|
msgpack
Package msgpack provides a MessagePack serializer implementation for mink.
|
Package msgpack provides a MessagePack serializer implementation for mink. |
|
protobuf
Package protobuf provides a Protocol Buffers serializer for mink events.
|
Package protobuf provides a Protocol Buffers serializer for mink events. |
|
testing
|
|
|
assertions
Package assertions provides event assertion utilities for testing event-sourced systems.
|
Package assertions provides event assertion utilities for testing event-sourced systems. |
|
bdd
Package bdd provides BDD-style test fixtures for event-sourced aggregates.
|
Package bdd provides BDD-style test fixtures for event-sourced aggregates. |
|
benchmarks
Package benchmarks provides a shared benchmark suite for EventStoreAdapter implementations.
|
Package benchmarks provides a shared benchmark suite for EventStoreAdapter implementations. |
|
containers
Package containers provides test container utilities for integration testing.
|
Package containers provides test container utilities for integration testing. |
|
projections
Package projections provides testing utilities for projection development.
|
Package projections provides testing utilities for projection development. |
|
sagas
Package sagas provides testing utilities for saga (process manager) development.
|
Package sagas provides testing utilities for saga (process manager) development. |
|
testutil
Package testutil provides test utilities and fixtures for go-mink.
|
Package testutil provides test utilities and fixtures for go-mink. |