Documentation
¶
Index ¶
- func CalculatePagination(page, pageSize, totalCount int) (int, bool, bool)
- type Aggregate
- type AggregateConfig
- type AggregateFactory
- func (af *AggregateFactory) Create(aggregateType string) (Aggregate, error)
- func (af *AggregateFactory) CreateNew(aggregateType string) (Aggregate, error)
- func (af *AggregateFactory) CreateWithID(aggregateType, id string) (Aggregate, error)
- func (af *AggregateFactory) GetRegisteredTypes() []string
- func (af *AggregateFactory) RegisterType(aggregateType string, aggregateStruct interface{})
- type AggregateRepository
- func (ar *AggregateRepository) Delete(ctx context.Context, aggregate Aggregate, deletionEvent Event) error
- func (ar *AggregateRepository) Exists(ctx context.Context, aggregateID, aggregateType string) (bool, error)
- func (ar *AggregateRepository) GetVersion(ctx context.Context, aggregateID, aggregateType string) (int, error)
- func (ar *AggregateRepository) Load(ctx context.Context, aggregateID, aggregateType string, aggregate Aggregate) error
- func (ar *AggregateRepository) Save(ctx context.Context, aggregate Aggregate) error
- type AsyncCommandResult
- type AsyncQueryResult
- type BaseAggregate
- func (ba *BaseAggregate) AddEvent(event Event)
- func (ba *BaseAggregate) ApplyEvent(event Event) error
- func (ba *BaseAggregate) GetID() string
- func (ba *BaseAggregate) GetType() string
- func (ba *BaseAggregate) GetUncommittedEvents() []Event
- func (ba *BaseAggregate) GetVersion() int
- func (ba *BaseAggregate) LoadFromHistory(events []*StoredEvent) error
- func (ba *BaseAggregate) MarkEventsAsCommitted()
- type BaseCommand
- type BaseEvent
- func (e *BaseEvent) GetAggregateID() string
- func (e *BaseEvent) GetAggregateType() string
- func (e *BaseEvent) GetEventData() map[string]interface{}
- func (e *BaseEvent) GetEventType() string
- func (e *BaseEvent) GetMetadata() map[string]interface{}
- func (e *BaseEvent) GetTimestamp() time.Time
- func (e *BaseEvent) GetVersion() int
- func (e *BaseEvent) WithMetadata(key string, value interface{}) *BaseEvent
- type BaseProjection
- type BaseQuery
- type BaseReadModel
- type BatchCommand
- type BatchResult
- type CacheStats
- type Command
- type CommandBus
- func (cb *CommandBus) Close() error
- func (cb *CommandBus) Execute(ctx context.Context, command Command) (*CommandResult, error)
- func (cb *CommandBus) ExecuteAsync(ctx context.Context, command Command) <-chan *AsyncCommandResult
- func (cb *CommandBus) ExecuteBatch(ctx context.Context, batch *BatchCommand) (*BatchResult, error)
- func (cb *CommandBus) GetHandlerCount() int
- func (cb *CommandBus) GetRegisteredCommands() []string
- func (cb *CommandBus) HealthCheck(ctx context.Context) error
- func (cb *CommandBus) RegisterHandler(handler CommandHandler) error
- func (cb *CommandBus) UnregisterHandler(commandType string) error
- func (cb *CommandBus) ValidateCommand(command Command) error
- type CommandBusConfig
- type CommandHandler
- type CommandResult
- type Event
- type EventStore
- type EventStream
- type InMemoryQueryCache
- type PaginatedQuery
- type PaginatedResult
- type Projection
- type ProjectionConfig
- type ProjectionManager
- func (pm *ProjectionManager) Close() error
- func (pm *ProjectionManager) GetAllProjectionStates() ([]*ProjectionState, error)
- func (pm *ProjectionManager) GetProjectionCount() int
- func (pm *ProjectionManager) GetProjectionState(projectionName string) (*ProjectionState, error)
- func (pm *ProjectionManager) HealthCheck(ctx context.Context) error
- func (pm *ProjectionManager) IsRunning() bool
- func (pm *ProjectionManager) ProjectEvents(ctx context.Context, projectionName string, fromPosition int64) error
- func (pm *ProjectionManager) RebuildProjection(ctx context.Context, projectionName string) error
- func (pm *ProjectionManager) RegisterProjection(projection Projection) error
- func (pm *ProjectionManager) ResetProjection(ctx context.Context, projectionName string) error
- func (pm *ProjectionManager) Start(ctx context.Context) error
- func (pm *ProjectionManager) Stop() error
- func (pm *ProjectionManager) UnregisterProjection(name string) error
- type ProjectionResult
- type ProjectionState
- type Query
- type QueryBus
- func (qb *QueryBus) Close() error
- func (qb *QueryBus) Execute(ctx context.Context, query Query) (*QueryResult, error)
- func (qb *QueryBus) ExecuteAsync(ctx context.Context, query Query) <-chan *AsyncQueryResult
- func (qb *QueryBus) GetCacheStats() *CacheStats
- func (qb *QueryBus) GetHandlerCount() int
- func (qb *QueryBus) GetRegisteredQueries() []string
- func (qb *QueryBus) HealthCheck(ctx context.Context) error
- func (qb *QueryBus) InvalidateCache(pattern string)
- func (qb *QueryBus) RegisterHandler(handler QueryHandler) error
- func (qb *QueryBus) UnregisterHandler(queryType string) error
- func (qb *QueryBus) ValidateQuery(query Query) error
- type QueryBusConfig
- type QueryCache
- type QueryHandler
- type QueryResult
- type ReadModel
- type ReadModelConfig
- type ReadModelRecord
- type ReadModelStore
- func (rms *ReadModelStore) CleanupExpired(ctx context.Context) (int64, error)
- func (rms *ReadModelStore) Close() error
- func (rms *ReadModelStore) Delete(ctx context.Context, id, modelType string) error
- func (rms *ReadModelStore) DeleteByType(ctx context.Context, modelType string) error
- func (rms *ReadModelStore) Exists(ctx context.Context, id, modelType string) (bool, error)
- func (rms *ReadModelStore) Get(ctx context.Context, id, modelType string, result interface{}) error
- func (rms *ReadModelStore) GetStats(ctx context.Context) (map[string]interface{}, error)
- func (rms *ReadModelStore) List(ctx context.Context, modelType string, filter map[string]interface{}, ...) ([]*ReadModelRecord, int64, error)
- func (rms *ReadModelStore) Save(ctx context.Context, model ReadModel) error
- func (rms *ReadModelStore) SaveWithTTL(ctx context.Context, model ReadModel, ttl time.Duration) error
- type Snapshot
- type StoredEvent
- type UserAggregate
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Aggregate ¶
type Aggregate interface { GetID() string GetType() string GetVersion() int GetUncommittedEvents() []Event MarkEventsAsCommitted() LoadFromHistory(events []*StoredEvent) error ApplyEvent(event Event) error }
Aggregate represents a domain aggregate root
type AggregateConfig ¶
type AggregateConfig struct { SnapshotFrequency int `json:"snapshot_frequency"` EnableSnapshots bool `json:"enable_snapshots"` MaxEventCount int `json:"max_event_count"` }
AggregateConfig holds aggregate repository configuration
type AggregateFactory ¶
type AggregateFactory struct {
// contains filtered or unexported fields
}
AggregateFactory creates aggregate instances
func NewAggregateFactory ¶
func NewAggregateFactory(logger *logrus.Logger) *AggregateFactory
NewAggregateFactory creates a new aggregate factory
func (*AggregateFactory) Create ¶
func (af *AggregateFactory) Create(aggregateType string) (Aggregate, error)
Create creates a new aggregate instance
func (*AggregateFactory) CreateNew ¶
func (af *AggregateFactory) CreateNew(aggregateType string) (Aggregate, error)
CreateNew creates a new aggregate with a generated UUID
func (*AggregateFactory) CreateWithID ¶
func (af *AggregateFactory) CreateWithID(aggregateType, id string) (Aggregate, error)
CreateWithID creates a new aggregate instance with a specific ID
func (*AggregateFactory) GetRegisteredTypes ¶
func (af *AggregateFactory) GetRegisteredTypes() []string
GetRegisteredTypes returns all registered aggregate types
func (*AggregateFactory) RegisterType ¶
func (af *AggregateFactory) RegisterType(aggregateType string, aggregateStruct interface{})
RegisterType registers an aggregate type
type AggregateRepository ¶
type AggregateRepository struct {
// contains filtered or unexported fields
}
AggregateRepository handles aggregate persistence and retrieval
func NewAggregateRepository ¶
func NewAggregateRepository(eventStore EventStore, config AggregateConfig, logger *logrus.Logger) *AggregateRepository
NewAggregateRepository creates a new aggregate repository
func (*AggregateRepository) Delete ¶
func (ar *AggregateRepository) Delete(ctx context.Context, aggregate Aggregate, deletionEvent Event) error
Delete marks an aggregate as deleted (typically by adding a deletion event)
func (*AggregateRepository) Exists ¶
func (ar *AggregateRepository) Exists(ctx context.Context, aggregateID, aggregateType string) (bool, error)
Exists checks if an aggregate exists
func (*AggregateRepository) GetVersion ¶
func (ar *AggregateRepository) GetVersion(ctx context.Context, aggregateID, aggregateType string) (int, error)
GetVersion returns the current version of an aggregate
type AsyncCommandResult ¶
type AsyncCommandResult struct { Result *CommandResult Error error }
AsyncCommandResult represents the result of an async command execution
type AsyncQueryResult ¶
type AsyncQueryResult struct { Result *QueryResult Error error }
AsyncQueryResult represents the result of an async query execution
type BaseAggregate ¶
type BaseAggregate struct { ID string `json:"id"` Type string `json:"type"` Version int `json:"version"` UncommittedEvents []Event `json:"-"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` TenantID string `json:"tenant_id"` }
BaseAggregate provides a base implementation for aggregates
func (*BaseAggregate) AddEvent ¶
func (ba *BaseAggregate) AddEvent(event Event)
AddEvent adds an event to the uncommitted events list
func (*BaseAggregate) ApplyEvent ¶
func (ba *BaseAggregate) ApplyEvent(event Event) error
ApplyEvent applies an event to the aggregate (to be overridden by concrete aggregates)
func (*BaseAggregate) GetID ¶
func (ba *BaseAggregate) GetID() string
GetID returns the aggregate ID
func (*BaseAggregate) GetType ¶
func (ba *BaseAggregate) GetType() string
GetType returns the aggregate type
func (*BaseAggregate) GetUncommittedEvents ¶
func (ba *BaseAggregate) GetUncommittedEvents() []Event
GetUncommittedEvents returns uncommitted events
func (*BaseAggregate) GetVersion ¶
func (ba *BaseAggregate) GetVersion() int
GetVersion returns the aggregate version
func (*BaseAggregate) LoadFromHistory ¶
func (ba *BaseAggregate) LoadFromHistory(events []*StoredEvent) error
LoadFromHistory loads the aggregate from historical events
func (*BaseAggregate) MarkEventsAsCommitted ¶
func (ba *BaseAggregate) MarkEventsAsCommitted()
MarkEventsAsCommitted marks all uncommitted events as committed
type BaseCommand ¶
type BaseCommand struct { CommandType string `json:"command_type"` AggregateID string `json:"aggregate_id"` Metadata map[string]interface{} `json:"metadata"` UserID string `json:"user_id"` TenantID string `json:"tenant_id"` CorrelationID string `json:"correlation_id"` }
BaseCommand provides a base implementation for commands
func (*BaseCommand) GetAggregateID ¶
func (bc *BaseCommand) GetAggregateID() string
GetAggregateID returns the aggregate ID
func (*BaseCommand) GetCommandType ¶
func (bc *BaseCommand) GetCommandType() string
GetCommandType returns the command type
func (*BaseCommand) GetMetadata ¶
func (bc *BaseCommand) GetMetadata() map[string]interface{}
GetMetadata returns the command metadata
type BaseEvent ¶ added in v1.0.0
type BaseEvent struct { EventType string `json:"event_type"` AggregateID string `json:"aggregate_id"` AggregateType string `json:"aggregate_type"` EventData map[string]interface{} `json:"event_data"` Metadata map[string]interface{} `json:"metadata"` Timestamp time.Time `json:"timestamp"` Version int `json:"version"` Data map[string]interface{} `json:"data"` CorrelationID string `json:"correlation_id"` CausationID string `json:"causation_id"` UserID string `json:"user_id"` TenantID string `json:"tenant_id"` }
BaseEvent provides a basic implementation of Event
func (*BaseEvent) GetAggregateID ¶ added in v1.0.0
GetAggregateID returns the aggregate ID
func (*BaseEvent) GetAggregateType ¶ added in v1.0.0
GetAggregateType returns the aggregate type
func (*BaseEvent) GetEventData ¶ added in v1.0.0
GetEventData returns the event data
func (*BaseEvent) GetEventType ¶ added in v1.0.0
GetEventType returns the event type
func (*BaseEvent) GetMetadata ¶ added in v1.0.0
GetMetadata returns the event metadata
func (*BaseEvent) GetTimestamp ¶ added in v1.0.0
GetTimestamp returns the event timestamp
func (*BaseEvent) GetVersion ¶ added in v1.0.0
GetVersion returns the event version
func (*BaseEvent) WithMetadata ¶ added in v1.0.0
WithMetadata adds metadata to an event
type BaseProjection ¶
type BaseProjection struct { Name string `json:"name"` EventTypes []string `json:"event_types"` Position int64 `json:"position"` Logger *logrus.Logger ReadStore *ReadModelStore }
BaseProjection provides a base implementation for projections
func (*BaseProjection) GetEventTypes ¶
func (bp *BaseProjection) GetEventTypes() []string
GetEventTypes returns the event types this projection handles
func (*BaseProjection) GetName ¶
func (bp *BaseProjection) GetName() string
GetName returns the projection name
func (*BaseProjection) GetPosition ¶
func (bp *BaseProjection) GetPosition() int64
GetPosition returns the current projection position
func (*BaseProjection) SetPosition ¶
func (bp *BaseProjection) SetPosition(position int64)
SetPosition sets the projection position
type BaseQuery ¶
type BaseQuery struct { QueryType string `json:"query_type"` Parameters map[string]interface{} `json:"parameters"` CacheKey string `json:"cache_key"` Cacheable bool `json:"cacheable"` UserID string `json:"user_id"` TenantID string `json:"tenant_id"` CorrelationID string `json:"correlation_id"` }
BaseQuery provides a base implementation for queries
func (*BaseQuery) GetCacheKey ¶
GetCacheKey returns the cache key
func (*BaseQuery) GetParameters ¶
GetParameters returns the query parameters
func (*BaseQuery) GetQueryType ¶
GetQueryType returns the query type
func (*BaseQuery) IsCacheable ¶
IsCacheable returns whether the query is cacheable
type BaseReadModel ¶
type BaseReadModel struct { ID string `json:"id"` Type string `json:"type"` Version int `json:"version"` LastUpdated time.Time `json:"last_updated"` TenantID string `json:"tenant_id"` Metadata map[string]interface{} `json:"metadata,omitempty"` }
BaseReadModel provides a base implementation for read models
func (*BaseReadModel) GetID ¶
func (brm *BaseReadModel) GetID() string
GetID returns the read model ID
func (*BaseReadModel) GetLastUpdated ¶
func (brm *BaseReadModel) GetLastUpdated() time.Time
GetLastUpdated returns the last updated timestamp
func (*BaseReadModel) GetType ¶
func (brm *BaseReadModel) GetType() string
GetType returns the read model type
func (*BaseReadModel) GetVersion ¶
func (brm *BaseReadModel) GetVersion() int
GetVersion returns the read model version
type BatchCommand ¶
type BatchCommand struct { Commands []Command `json:"commands"` Transaction bool `json:"transaction"` // If true, all commands must succeed }
BatchCommand represents a batch of commands to execute
type BatchResult ¶
type BatchResult struct { Results []*CommandResult `json:"results"` Success bool `json:"success"` FailedAt int `json:"failed_at,omitempty"` // Index of failed command TotalCount int `json:"total_count"` }
BatchResult represents the result of batch command execution
type CacheStats ¶
type CacheStats struct { Hits int64 `json:"hits"` Misses int64 `json:"misses"` Entries int `json:"entries"` HitRatio float64 `json:"hit_ratio"` }
CacheStats represents cache statistics
type Command ¶
type Command interface { GetCommandType() string GetAggregateID() string GetMetadata() map[string]interface{} }
Command represents a command that can be executed
type CommandBus ¶
type CommandBus struct {
// contains filtered or unexported fields
}
CommandBus handles command dispatching and execution
func NewCommandBus ¶
func NewCommandBus(config CommandBusConfig, logger *logrus.Logger) *CommandBus
NewCommandBus creates a new command bus
func (*CommandBus) Execute ¶
func (cb *CommandBus) Execute(ctx context.Context, command Command) (*CommandResult, error)
Execute executes a command
func (*CommandBus) ExecuteAsync ¶
func (cb *CommandBus) ExecuteAsync(ctx context.Context, command Command) <-chan *AsyncCommandResult
ExecuteAsync executes a command asynchronously
func (*CommandBus) ExecuteBatch ¶
func (cb *CommandBus) ExecuteBatch(ctx context.Context, batch *BatchCommand) (*BatchResult, error)
ExecuteBatch executes multiple commands in a batch
func (*CommandBus) GetHandlerCount ¶
func (cb *CommandBus) GetHandlerCount() int
GetHandlerCount returns the number of registered handlers
func (*CommandBus) GetRegisteredCommands ¶
func (cb *CommandBus) GetRegisteredCommands() []string
GetRegisteredCommands returns a list of registered command types
func (*CommandBus) HealthCheck ¶
func (cb *CommandBus) HealthCheck(ctx context.Context) error
HealthCheck performs a health check on the command bus
func (*CommandBus) RegisterHandler ¶
func (cb *CommandBus) RegisterHandler(handler CommandHandler) error
RegisterHandler registers a command handler
func (*CommandBus) UnregisterHandler ¶
func (cb *CommandBus) UnregisterHandler(commandType string) error
UnregisterHandler removes a command handler
func (*CommandBus) ValidateCommand ¶
func (cb *CommandBus) ValidateCommand(command Command) error
ValidateCommand validates a command before execution
type CommandBusConfig ¶
type CommandBusConfig struct { EnableMetrics bool `json:"enable_metrics"` EnableTracing bool `json:"enable_tracing"` MaxConcurrent int `json:"max_concurrent"` TimeoutDuration int `json:"timeout_duration"` // seconds }
CommandBusConfig holds command bus configuration
type CommandHandler ¶
type CommandHandler interface { Handle(ctx context.Context, command Command) (*CommandResult, error) GetCommandType() string }
CommandHandler defines the interface for command handlers
type CommandResult ¶
type CommandResult struct { AggregateID string `json:"aggregate_id"` EventsEmitted []interface{} `json:"events_emitted"` Success bool `json:"success"` Message string `json:"message,omitempty"` Metadata map[string]interface{} `json:"metadata,omitempty"` Version int `json:"version"` }
CommandResult represents the result of a command execution
type Event ¶ added in v1.0.0
type Event interface { GetEventType() string GetAggregateID() string GetAggregateType() string GetEventData() map[string]interface{} GetMetadata() map[string]interface{} GetTimestamp() time.Time GetVersion() int }
Event represents a domain event
type EventStore ¶
type EventStore interface { // Event operations AppendEvents(ctx context.Context, aggregateID, aggregateType string, expectedVersion int, events []Event) error GetEventStream(ctx context.Context, aggregateID, aggregateType string, fromVersion int) (*EventStream, error) GetEventsByTimeRange(ctx context.Context, start, end time.Time, limit int) ([]*StoredEvent, error) GetEventsByType(ctx context.Context, eventType string, limit int, offset int) ([]*StoredEvent, error) GetEventCount(ctx context.Context) (int64, error) // Aggregate operations GetAggregateVersion(ctx context.Context, aggregateID, aggregateType string) (int, error) // Snapshot operations GetLatestSnapshot(ctx context.Context, aggregateID, aggregateType string) (*Snapshot, error) CreateSnapshot(ctx context.Context, aggregateID, aggregateType string, version int, data interface{}) error }
EventStore interface defines the complete contract for event storage
type EventStream ¶ added in v1.0.0
type EventStream struct { AggregateID string `json:"aggregate_id"` AggregateType string `json:"aggregate_type"` Events []*StoredEvent `json:"events"` Version int `json:"version"` FromVersion int `json:"from_version"` ToVersion int `json:"to_version"` Timestamp time.Time `json:"timestamp"` FromSnapshot bool `json:"from_snapshot"` }
EventStream represents a stream of events
func (*EventStream) GetEvents ¶ added in v1.0.0
func (es *EventStream) GetEvents() ([]Event, error)
GetEvents returns the events as Event interfaces
type InMemoryQueryCache ¶
type InMemoryQueryCache struct {
// contains filtered or unexported fields
}
InMemoryQueryCache provides an in-memory implementation of QueryCache
func NewInMemoryQueryCache ¶
func NewInMemoryQueryCache() *InMemoryQueryCache
NewInMemoryQueryCache creates a new in-memory query cache
func (*InMemoryQueryCache) Clear ¶
func (imc *InMemoryQueryCache) Clear()
Clear removes all cached entries
func (*InMemoryQueryCache) Delete ¶
func (imc *InMemoryQueryCache) Delete(key string)
Delete removes a cached entry
func (*InMemoryQueryCache) Get ¶
func (imc *InMemoryQueryCache) Get(key string) (*QueryResult, bool)
Get retrieves a cached query result
func (*InMemoryQueryCache) GetStats ¶
func (imc *InMemoryQueryCache) GetStats() CacheStats
GetStats returns cache statistics
func (*InMemoryQueryCache) Set ¶
func (imc *InMemoryQueryCache) Set(key string, result *QueryResult, ttl time.Duration)
Set stores a query result in cache
type PaginatedQuery ¶
type PaginatedQuery struct { *BaseQuery Page int `json:"page"` PageSize int `json:"page_size"` SortBy string `json:"sort_by,omitempty"` SortDir string `json:"sort_dir,omitempty"` // asc, desc }
PaginatedQuery represents a query with pagination parameters
func NewPaginatedQuery ¶
func NewPaginatedQuery(queryType string, page, pageSize int) *PaginatedQuery
NewPaginatedQuery creates a new paginated query
type PaginatedResult ¶
type PaginatedResult struct { *QueryResult Page int `json:"page"` PageSize int `json:"page_size"` TotalCount int `json:"total_count"` TotalPages int `json:"total_pages"` HasNext bool `json:"has_next"` HasPrevious bool `json:"has_previous"` }
PaginatedResult represents a paginated query result
type Projection ¶
type Projection interface { GetName() string GetEventTypes() []string Project(ctx context.Context, event *StoredEvent) (*ProjectionResult, error) Reset(ctx context.Context) error GetPosition() int64 SetPosition(position int64) }
Projection defines the interface for event projections
type ProjectionConfig ¶
type ProjectionConfig struct { BatchSize int `json:"batch_size"` PollInterval time.Duration `json:"poll_interval"` ErrorRetryCount int `json:"error_retry_count"` ErrorRetryDelay time.Duration `json:"error_retry_delay"` EnableMetrics bool `json:"enable_metrics"` }
ProjectionConfig holds projection manager configuration
type ProjectionManager ¶
type ProjectionManager struct {
// contains filtered or unexported fields
}
ProjectionManager manages event projections to read models
func NewProjectionManager ¶
func NewProjectionManager(eventStore EventStore, readStore *ReadModelStore, config ProjectionConfig, logger *logrus.Logger) *ProjectionManager
NewProjectionManager creates a new projection manager
func (*ProjectionManager) Close ¶
func (pm *ProjectionManager) Close() error
Close closes the projection manager
func (*ProjectionManager) GetAllProjectionStates ¶
func (pm *ProjectionManager) GetAllProjectionStates() ([]*ProjectionState, error)
GetAllProjectionStates returns the state of all projections
func (*ProjectionManager) GetProjectionCount ¶
func (pm *ProjectionManager) GetProjectionCount() int
GetProjectionCount returns the number of registered projections
func (*ProjectionManager) GetProjectionState ¶
func (pm *ProjectionManager) GetProjectionState(projectionName string) (*ProjectionState, error)
GetProjectionState returns the state of a projection
func (*ProjectionManager) HealthCheck ¶
func (pm *ProjectionManager) HealthCheck(ctx context.Context) error
HealthCheck performs a health check on the projection manager
func (*ProjectionManager) IsRunning ¶
func (pm *ProjectionManager) IsRunning() bool
IsRunning returns whether the projection manager is running
func (*ProjectionManager) ProjectEvents ¶
func (pm *ProjectionManager) ProjectEvents(ctx context.Context, projectionName string, fromPosition int64) error
ProjectEvents manually projects events for a specific projection
func (*ProjectionManager) RebuildProjection ¶
func (pm *ProjectionManager) RebuildProjection(ctx context.Context, projectionName string) error
RebuildProjection rebuilds a projection from scratch
func (*ProjectionManager) RegisterProjection ¶
func (pm *ProjectionManager) RegisterProjection(projection Projection) error
RegisterProjection registers a projection
func (*ProjectionManager) ResetProjection ¶
func (pm *ProjectionManager) ResetProjection(ctx context.Context, projectionName string) error
ResetProjection resets a projection to start from the beginning
func (*ProjectionManager) Start ¶
func (pm *ProjectionManager) Start(ctx context.Context) error
Start starts the projection manager
func (*ProjectionManager) Stop ¶
func (pm *ProjectionManager) Stop() error
Stop stops the projection manager
func (*ProjectionManager) UnregisterProjection ¶
func (pm *ProjectionManager) UnregisterProjection(name string) error
UnregisterProjection removes a projection
type ProjectionResult ¶
type ProjectionResult struct { ReadModels []ReadModel `json:"read_models"` Success bool `json:"success"` Message string `json:"message,omitempty"` Position int64 `json:"position"` }
ProjectionResult represents the result of a projection
type ProjectionState ¶
type ProjectionState struct { Name string `json:"name"` Position int64 `json:"position"` LastUpdated time.Time `json:"last_updated"` EventsCount int64 `json:"events_count"` ErrorsCount int64 `json:"errors_count"` IsRunning bool `json:"is_running"` LastError string `json:"last_error,omitempty"` }
ProjectionState represents the state of a projection
type Query ¶
type Query interface { GetQueryType() string GetParameters() map[string]interface{} GetCacheKey() string IsCacheable() bool }
Query represents a query that can be executed
type QueryBus ¶
type QueryBus struct {
// contains filtered or unexported fields
}
QueryBus handles query dispatching and execution
func NewQueryBus ¶
func NewQueryBus(config QueryBusConfig, logger *logrus.Logger) *QueryBus
NewQueryBus creates a new query bus
func (*QueryBus) ExecuteAsync ¶
func (qb *QueryBus) ExecuteAsync(ctx context.Context, query Query) <-chan *AsyncQueryResult
ExecuteAsync executes a query asynchronously
func (*QueryBus) GetCacheStats ¶
func (qb *QueryBus) GetCacheStats() *CacheStats
GetCacheStats returns cache statistics
func (*QueryBus) GetHandlerCount ¶
GetHandlerCount returns the number of registered handlers
func (*QueryBus) GetRegisteredQueries ¶
GetRegisteredQueries returns a list of registered query types
func (*QueryBus) HealthCheck ¶
HealthCheck performs a health check on the query bus
func (*QueryBus) InvalidateCache ¶
InvalidateCache invalidates cached results for a specific pattern
func (*QueryBus) RegisterHandler ¶
func (qb *QueryBus) RegisterHandler(handler QueryHandler) error
RegisterHandler registers a query handler
func (*QueryBus) UnregisterHandler ¶
UnregisterHandler removes a query handler
func (*QueryBus) ValidateQuery ¶
ValidateQuery validates a query before execution
type QueryBusConfig ¶
type QueryBusConfig struct { EnableCache bool `json:"enable_cache"` CacheTTL time.Duration `json:"cache_ttl"` EnableMetrics bool `json:"enable_metrics"` EnableTracing bool `json:"enable_tracing"` MaxConcurrent int `json:"max_concurrent"` TimeoutDuration time.Duration `json:"timeout_duration"` }
QueryBusConfig holds query bus configuration
type QueryCache ¶
type QueryCache interface { Get(key string) (*QueryResult, bool) Set(key string, result *QueryResult, ttl time.Duration) Delete(key string) Clear() GetStats() CacheStats }
QueryCache interface for caching query results
type QueryHandler ¶
type QueryHandler interface { Handle(ctx context.Context, query Query) (*QueryResult, error) GetQueryType() string }
QueryHandler defines the interface for query handlers
type QueryResult ¶
type QueryResult struct { Data interface{} `json:"data"` Success bool `json:"success"` Message string `json:"message,omitempty"` Metadata map[string]interface{} `json:"metadata,omitempty"` Count int `json:"count,omitempty"` FromCache bool `json:"from_cache"` Timestamp time.Time `json:"timestamp"` }
QueryResult represents the result of a query execution
type ReadModel ¶
type ReadModel interface { GetID() string GetType() string GetVersion() int GetLastUpdated() time.Time }
ReadModel represents a read-optimized data model
type ReadModelConfig ¶
type ReadModelConfig struct { TablePrefix string `json:"table_prefix"` EnableVersioning bool `json:"enable_versioning"` EnableTTL bool `json:"enable_ttl"` DefaultTTL time.Duration `json:"default_ttl"` BatchSize int `json:"batch_size"` }
ReadModelConfig holds read model configuration
type ReadModelRecord ¶
type ReadModelRecord struct { ID string `gorm:"type:uuid;primary_key" json:"id"` Type string `gorm:"type:varchar(255);not null;index" json:"type"` Data string `gorm:"type:jsonb;not null" json:"data"` Version int `gorm:"not null;default:1" json:"version"` CreatedAt time.Time `gorm:"autoCreateTime" json:"created_at"` UpdatedAt time.Time `gorm:"autoUpdateTime" json:"updated_at"` ExpiresAt *time.Time `gorm:"index" json:"expires_at,omitempty"` TenantID string `gorm:"type:uuid;index" json:"tenant_id"` Metadata string `gorm:"type:jsonb" json:"metadata,omitempty"` }
ReadModelRecord represents a stored read model
type ReadModelStore ¶
type ReadModelStore struct {
// contains filtered or unexported fields
}
ReadModelStore handles storage and retrieval of read models
func NewReadModelStore ¶
func NewReadModelStore(db *gorm.DB, config ReadModelConfig, logger *logrus.Logger) (*ReadModelStore, error)
NewReadModelStore creates a new read model store
func (*ReadModelStore) CleanupExpired ¶
func (rms *ReadModelStore) CleanupExpired(ctx context.Context) (int64, error)
CleanupExpired removes expired read models
func (*ReadModelStore) Close ¶
func (rms *ReadModelStore) Close() error
Close closes the read model store
func (*ReadModelStore) Delete ¶
func (rms *ReadModelStore) Delete(ctx context.Context, id, modelType string) error
Delete removes a read model
func (*ReadModelStore) DeleteByType ¶
func (rms *ReadModelStore) DeleteByType(ctx context.Context, modelType string) error
DeleteByType removes all read models of a specific type
func (*ReadModelStore) Get ¶
func (rms *ReadModelStore) Get(ctx context.Context, id, modelType string, result interface{}) error
Get retrieves a read model by ID and type
func (*ReadModelStore) GetStats ¶
func (rms *ReadModelStore) GetStats(ctx context.Context) (map[string]interface{}, error)
GetStats returns statistics about stored read models
func (*ReadModelStore) List ¶
func (rms *ReadModelStore) List(ctx context.Context, modelType string, filter map[string]interface{}, limit, offset int) ([]*ReadModelRecord, int64, error)
List retrieves multiple read models by type with optional filtering
func (*ReadModelStore) Save ¶
func (rms *ReadModelStore) Save(ctx context.Context, model ReadModel) error
Save saves a read model to the store
func (*ReadModelStore) SaveWithTTL ¶
func (rms *ReadModelStore) SaveWithTTL(ctx context.Context, model ReadModel, ttl time.Duration) error
SaveWithTTL saves a read model with a custom TTL
type Snapshot ¶ added in v1.0.0
type Snapshot struct { ID string `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"` AggregateID string `gorm:"not null;index" json:"aggregate_id"` AggregateType string `gorm:"not null;index" json:"aggregate_type"` Version int `gorm:"not null" json:"version"` Data string `gorm:"type:jsonb" json:"data"` Timestamp time.Time `gorm:"not null" json:"timestamp"` CreatedAt time.Time `gorm:"autoCreateTime" json:"created_at"` }
Snapshot represents an aggregate snapshot
type StoredEvent ¶ added in v1.0.0
type StoredEvent struct { ID string `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"` EventType string `gorm:"not null;index" json:"event_type"` AggregateID string `gorm:"not null;index" json:"aggregate_id"` AggregateType string `gorm:"not null;index" json:"aggregate_type"` EventData string `gorm:"type:jsonb" json:"event_data"` Metadata string `gorm:"type:jsonb" json:"metadata"` Version int `gorm:"not null;index" json:"version"` Timestamp time.Time `gorm:"not null;index" json:"timestamp"` CreatedAt time.Time `gorm:"autoCreateTime" json:"created_at"` CorrelationID string `gorm:"index" json:"correlation_id"` CausationID string `gorm:"index" json:"causation_id"` UserID string `gorm:"index" json:"user_id"` TenantID string `gorm:"index" json:"tenant_id"` }
StoredEvent represents an event as stored in the event store
func FromEvent ¶ added in v1.0.0
func FromEvent(event Event) (*StoredEvent, error)
FromEvent creates a StoredEvent from an Event
func (*StoredEvent) ToEvent ¶ added in v1.0.0
func (se *StoredEvent) ToEvent() (Event, error)
ToEvent converts a StoredEvent to an Event
type UserAggregate ¶
type UserAggregate struct { *BaseAggregate Email string `json:"email"` Name string `json:"name"` IsActive bool `json:"is_active"` DeletedAt *time.Time `json:"deleted_at,omitempty"` }
UserAggregate example aggregate
func NewUserAggregate ¶
func NewUserAggregate(id, email, name string) *UserAggregate
NewUserAggregate creates a new user aggregate
func (*UserAggregate) ApplyEvent ¶
func (ua *UserAggregate) ApplyEvent(event Event) error
ApplyEvent applies events to the user aggregate
func (*UserAggregate) ChangeEmail ¶
func (ua *UserAggregate) ChangeEmail(newEmail string, userID string)
ChangeEmail changes the user's email
func (*UserAggregate) Deactivate ¶
func (ua *UserAggregate) Deactivate(userID string, reason string)
Deactivate deactivates the user