Versions in this module Expand all Collapse all v1 v1.0.3 Aug 21, 2025 v1.0.2 Aug 20, 2025 v1.0.1 Aug 10, 2025 Changes in this version type EventStore + AppendEvents func(ctx context.Context, aggregateID, aggregateType string, expectedVersion int, ...) error + CreateSnapshot func(ctx context.Context, aggregateID, aggregateType string, version int, ...) error + GetAggregateVersion func(ctx context.Context, aggregateID, aggregateType string) (int, error) + GetLatestSnapshot func(ctx context.Context, aggregateID, aggregateType string) (*Snapshot, error) type EventStream + FromSnapshot bool + FromVersion int + ToVersion int v1.0.0 Aug 10, 2025 Changes in this version + type BaseEvent struct + AggregateID string + AggregateType string + CausationID string + CorrelationID string + Data map[string]interface{} + EventData map[string]interface{} + EventType string + Metadata map[string]interface{} + TenantID string + Timestamp time.Time + UserID string + Version int + func (e *BaseEvent) GetAggregateID() string + func (e *BaseEvent) GetAggregateType() string + func (e *BaseEvent) GetEventData() map[string]interface{} + func (e *BaseEvent) GetEventType() string + func (e *BaseEvent) GetMetadata() map[string]interface{} + func (e *BaseEvent) GetTimestamp() time.Time + func (e *BaseEvent) GetVersion() int + func (e *BaseEvent) WithMetadata(key string, value interface{}) *BaseEvent + type Event interface + GetAggregateID func() string + GetAggregateType func() string + GetEventData func() map[string]interface{} + GetEventType func() string + GetMetadata func() map[string]interface{} + GetTimestamp func() time.Time + GetVersion func() int + func NewEvent(eventType, aggregateID, aggregateType string, data map[string]interface{}, ...) Event + type EventStream struct + AggregateID string + AggregateType string + Events []*StoredEvent + Timestamp time.Time + Version int + func (es *EventStream) GetEvents() ([]Event, error) + type Snapshot struct + AggregateID string + AggregateType string + CreatedAt time.Time + Data string + ID string + Timestamp time.Time + Version int + type StoredEvent struct + AggregateID string + AggregateType string + CausationID string + CorrelationID string + CreatedAt time.Time + EventData string + EventType string + ID string + Metadata string + TenantID string + Timestamp time.Time + UserID string + Version int + func FromEvent(event Event) (*StoredEvent, error) + func (se *StoredEvent) ToEvent() (Event, error) v0 v0.1.0 Aug 4, 2025 Changes in this version + func CalculatePagination(page, pageSize, totalCount int) (int, bool, bool) + type Aggregate interface + ApplyEvent func(event Event) error + GetID func() string + GetType func() string + GetUncommittedEvents func() []Event + GetVersion func() int + LoadFromHistory func(events []*StoredEvent) error + MarkEventsAsCommitted func() + type AggregateConfig struct + EnableSnapshots bool + MaxEventCount int + SnapshotFrequency int + type AggregateFactory struct + func NewAggregateFactory(logger *logrus.Logger) *AggregateFactory + func (af *AggregateFactory) Create(aggregateType string) (Aggregate, error) + func (af *AggregateFactory) CreateNew(aggregateType string) (Aggregate, error) + func (af *AggregateFactory) CreateWithID(aggregateType, id string) (Aggregate, error) + func (af *AggregateFactory) GetRegisteredTypes() []string + func (af *AggregateFactory) RegisterType(aggregateType string, aggregateStruct interface{}) + type AggregateRepository struct + func NewAggregateRepository(eventStore *EventStore, config AggregateConfig, logger *logrus.Logger) *AggregateRepository + func (ar *AggregateRepository) Delete(ctx context.Context, aggregate Aggregate, deletionEvent Event) error + func (ar *AggregateRepository) Exists(ctx context.Context, aggregateID, aggregateType string) (bool, error) + func (ar *AggregateRepository) GetVersion(ctx context.Context, aggregateID, aggregateType string) (int, error) + func (ar *AggregateRepository) Load(ctx context.Context, aggregateID, aggregateType string, aggregate Aggregate) error + func (ar *AggregateRepository) Save(ctx context.Context, aggregate Aggregate) error + type AsyncCommandResult struct + Error error + Result *CommandResult + type AsyncQueryResult struct + Error error + Result *QueryResult + type BaseAggregate struct + CreatedAt time.Time + ID string + TenantID string + Type string + UncommittedEvents []Event + UpdatedAt time.Time + Version int + func (ba *BaseAggregate) AddEvent(event Event) + func (ba *BaseAggregate) ApplyEvent(event Event) error + func (ba *BaseAggregate) GetID() string + func (ba *BaseAggregate) GetType() string + func (ba *BaseAggregate) GetUncommittedEvents() []Event + func (ba *BaseAggregate) GetVersion() int + func (ba *BaseAggregate) LoadFromHistory(events []*StoredEvent) error + func (ba *BaseAggregate) MarkEventsAsCommitted() + type BaseCommand struct + AggregateID string + CommandType string + CorrelationID string + Metadata map[string]interface{} + TenantID string + UserID string + func (bc *BaseCommand) GetAggregateID() string + func (bc *BaseCommand) GetCommandType() string + func (bc *BaseCommand) GetMetadata() map[string]interface{} + type BaseProjection struct + EventTypes []string + Logger *logrus.Logger + Name string + Position int64 + ReadStore *ReadModelStore + func (bp *BaseProjection) GetEventTypes() []string + func (bp *BaseProjection) GetName() string + func (bp *BaseProjection) GetPosition() int64 + func (bp *BaseProjection) SetPosition(position int64) + type BaseQuery struct + CacheKey string + Cacheable bool + CorrelationID string + Parameters map[string]interface{} + QueryType string + TenantID string + UserID string + func (bq *BaseQuery) GetCacheKey() string + func (bq *BaseQuery) GetParameters() map[string]interface{} + func (bq *BaseQuery) GetQueryType() string + func (bq *BaseQuery) IsCacheable() bool + type BaseReadModel struct + ID string + LastUpdated time.Time + Metadata map[string]interface{} + TenantID string + Type string + Version int + func (brm *BaseReadModel) GetID() string + func (brm *BaseReadModel) GetLastUpdated() time.Time + func (brm *BaseReadModel) GetType() string + func (brm *BaseReadModel) GetVersion() int + type BatchCommand struct + Commands []Command + Transaction bool + type BatchResult struct + FailedAt int + Results []*CommandResult + Success bool + TotalCount int + type CacheStats struct + Entries int + HitRatio float64 + Hits int64 + Misses int64 + type Command interface + GetAggregateID func() string + GetCommandType func() string + GetMetadata func() map[string]interface{} + type CommandBus struct + func NewCommandBus(config CommandBusConfig, logger *logrus.Logger) *CommandBus + func (cb *CommandBus) Close() error + func (cb *CommandBus) Execute(ctx context.Context, command Command) (*CommandResult, error) + func (cb *CommandBus) ExecuteAsync(ctx context.Context, command Command) <-chan *AsyncCommandResult + func (cb *CommandBus) ExecuteBatch(ctx context.Context, batch *BatchCommand) (*BatchResult, error) + func (cb *CommandBus) GetHandlerCount() int + func (cb *CommandBus) GetRegisteredCommands() []string + func (cb *CommandBus) HealthCheck(ctx context.Context) error + func (cb *CommandBus) RegisterHandler(handler CommandHandler) error + func (cb *CommandBus) UnregisterHandler(commandType string) error + func (cb *CommandBus) ValidateCommand(command Command) error + type CommandBusConfig struct + EnableMetrics bool + EnableTracing bool + MaxConcurrent int + TimeoutDuration int + type CommandHandler interface + GetCommandType func() string + Handle func(ctx context.Context, command Command) (*CommandResult, error) + type CommandResult struct + AggregateID string + EventsEmitted []interface{} + Message string + Metadata map[string]interface{} + Success bool + Version int + type EventStore interface + GetEventCount func(ctx context.Context) (int64, error) + GetEventStream func(ctx context.Context, aggregateID, aggregateType string, fromVersion int) (*EventStream, error) + GetEventsByTimeRange func(ctx context.Context, start, end time.Time, limit int) ([]*StoredEvent, error) + GetEventsByType func(ctx context.Context, eventType string, limit int, offset int) ([]*StoredEvent, error) + type InMemoryQueryCache struct + func NewInMemoryQueryCache() *InMemoryQueryCache + func (imc *InMemoryQueryCache) Clear() + func (imc *InMemoryQueryCache) Delete(key string) + func (imc *InMemoryQueryCache) Get(key string) (*QueryResult, bool) + func (imc *InMemoryQueryCache) GetStats() CacheStats + func (imc *InMemoryQueryCache) Set(key string, result *QueryResult, ttl time.Duration) + type PaginatedQuery struct + Page int + PageSize int + SortBy string + SortDir string + func NewPaginatedQuery(queryType string, page, pageSize int) *PaginatedQuery + type PaginatedResult struct + HasNext bool + HasPrevious bool + Page int + PageSize int + TotalCount int + TotalPages int + type Projection interface + GetEventTypes func() []string + GetName func() string + GetPosition func() int64 + Project func(ctx context.Context, event *StoredEvent) (*ProjectionResult, error) + Reset func(ctx context.Context) error + SetPosition func(position int64) + type ProjectionConfig struct + BatchSize int + EnableMetrics bool + ErrorRetryCount int + ErrorRetryDelay time.Duration + PollInterval time.Duration + type ProjectionManager struct + func NewProjectionManager(eventStore EventStore, readStore *ReadModelStore, config ProjectionConfig, ...) *ProjectionManager + func (pm *ProjectionManager) Close() error + func (pm *ProjectionManager) GetAllProjectionStates() ([]*ProjectionState, error) + func (pm *ProjectionManager) GetProjectionCount() int + func (pm *ProjectionManager) GetProjectionState(projectionName string) (*ProjectionState, error) + func (pm *ProjectionManager) HealthCheck(ctx context.Context) error + func (pm *ProjectionManager) IsRunning() bool + func (pm *ProjectionManager) ProjectEvents(ctx context.Context, projectionName string, fromPosition int64) error + func (pm *ProjectionManager) RebuildProjection(ctx context.Context, projectionName string) error + func (pm *ProjectionManager) RegisterProjection(projection Projection) error + func (pm *ProjectionManager) ResetProjection(ctx context.Context, projectionName string) error + func (pm *ProjectionManager) Start(ctx context.Context) error + func (pm *ProjectionManager) Stop() error + func (pm *ProjectionManager) UnregisterProjection(name string) error + type ProjectionResult struct + Message string + Position int64 + ReadModels []ReadModel + Success bool + type ProjectionState struct + ErrorsCount int64 + EventsCount int64 + IsRunning bool + LastError string + LastUpdated time.Time + Name string + Position int64 + type Query interface + GetCacheKey func() string + GetParameters func() map[string]interface{} + GetQueryType func() string + IsCacheable func() bool + type QueryBus struct + func NewQueryBus(config QueryBusConfig, logger *logrus.Logger) *QueryBus + func (qb *QueryBus) Close() error + func (qb *QueryBus) Execute(ctx context.Context, query Query) (*QueryResult, error) + func (qb *QueryBus) ExecuteAsync(ctx context.Context, query Query) <-chan *AsyncQueryResult + func (qb *QueryBus) GetCacheStats() *CacheStats + func (qb *QueryBus) GetHandlerCount() int + func (qb *QueryBus) GetRegisteredQueries() []string + func (qb *QueryBus) HealthCheck(ctx context.Context) error + func (qb *QueryBus) InvalidateCache(pattern string) + func (qb *QueryBus) RegisterHandler(handler QueryHandler) error + func (qb *QueryBus) UnregisterHandler(queryType string) error + func (qb *QueryBus) ValidateQuery(query Query) error + type QueryBusConfig struct + CacheTTL time.Duration + EnableCache bool + EnableMetrics bool + EnableTracing bool + MaxConcurrent int + TimeoutDuration time.Duration + type QueryCache interface + Clear func() + Delete func(key string) + Get func(key string) (*QueryResult, bool) + GetStats func() CacheStats + Set func(key string, result *QueryResult, ttl time.Duration) + type QueryHandler interface + GetQueryType func() string + Handle func(ctx context.Context, query Query) (*QueryResult, error) + type QueryResult struct + Count int + Data interface{} + FromCache bool + Message string + Metadata map[string]interface{} + Success bool + Timestamp time.Time + type ReadModel interface + GetID func() string + GetLastUpdated func() time.Time + GetType func() string + GetVersion func() int + type ReadModelConfig struct + BatchSize int + DefaultTTL time.Duration + EnableTTL bool + EnableVersioning bool + TablePrefix string + type ReadModelRecord struct + CreatedAt time.Time + Data string + ExpiresAt *time.Time + ID string + Metadata string + TenantID string + Type string + UpdatedAt time.Time + Version int + type ReadModelStore struct + func NewReadModelStore(db *gorm.DB, config ReadModelConfig, logger *logrus.Logger) (*ReadModelStore, error) + func (rms *ReadModelStore) CleanupExpired(ctx context.Context) (int64, error) + func (rms *ReadModelStore) Close() error + func (rms *ReadModelStore) Delete(ctx context.Context, id, modelType string) error + func (rms *ReadModelStore) DeleteByType(ctx context.Context, modelType string) error + func (rms *ReadModelStore) Exists(ctx context.Context, id, modelType string) (bool, error) + func (rms *ReadModelStore) Get(ctx context.Context, id, modelType string, result interface{}) error + func (rms *ReadModelStore) GetStats(ctx context.Context) (map[string]interface{}, error) + func (rms *ReadModelStore) List(ctx context.Context, modelType string, filter map[string]interface{}, ...) ([]*ReadModelRecord, int64, error) + func (rms *ReadModelStore) Save(ctx context.Context, model ReadModel) error + func (rms *ReadModelStore) SaveWithTTL(ctx context.Context, model ReadModel, ttl time.Duration) error + type UserAggregate struct + DeletedAt *time.Time + Email string + IsActive bool + Name string + func NewUserAggregate(id, email, name string) *UserAggregate + func (ua *UserAggregate) ApplyEvent(event Event) error + func (ua *UserAggregate) ChangeEmail(newEmail string, userID string) + func (ua *UserAggregate) Deactivate(userID string, reason string)