Documentation
¶
Index ¶
- func CalculatePagination(page, pageSize, totalCount int) (int, bool, bool)
- type Aggregate
- type AggregateConfig
- type AggregateFactory
- func (af *AggregateFactory) Create(aggregateType string) (Aggregate, error)
- func (af *AggregateFactory) CreateNew(aggregateType string) (Aggregate, error)
- func (af *AggregateFactory) CreateWithID(aggregateType, id string) (Aggregate, error)
- func (af *AggregateFactory) GetRegisteredTypes() []string
- func (af *AggregateFactory) RegisterType(aggregateType string, aggregateStruct interface{})
- type AggregateRepository
- func (ar *AggregateRepository) Delete(ctx context.Context, aggregate Aggregate, deletionEvent Event) error
- func (ar *AggregateRepository) Exists(ctx context.Context, aggregateID, aggregateType string) (bool, error)
- func (ar *AggregateRepository) GetVersion(ctx context.Context, aggregateID, aggregateType string) (int, error)
- func (ar *AggregateRepository) Load(ctx context.Context, aggregateID, aggregateType string, aggregate Aggregate) error
- func (ar *AggregateRepository) Save(ctx context.Context, aggregate Aggregate) error
- type AsyncCommandResult
- type AsyncQueryResult
- type BaseAggregate
- func (ba *BaseAggregate) AddEvent(event Event)
- func (ba *BaseAggregate) ApplyEvent(event Event) error
- func (ba *BaseAggregate) GetID() string
- func (ba *BaseAggregate) GetType() string
- func (ba *BaseAggregate) GetUncommittedEvents() []Event
- func (ba *BaseAggregate) GetVersion() int
- func (ba *BaseAggregate) LoadFromHistory(events []*StoredEvent) error
- func (ba *BaseAggregate) MarkEventsAsCommitted()
- type BaseCommand
- type BaseProjection
- type BaseQuery
- type BaseReadModel
- type BatchCommand
- type BatchResult
- type CacheStats
- type Command
- type CommandBus
- func (cb *CommandBus) Close() error
- func (cb *CommandBus) Execute(ctx context.Context, command Command) (*CommandResult, error)
- func (cb *CommandBus) ExecuteAsync(ctx context.Context, command Command) <-chan *AsyncCommandResult
- func (cb *CommandBus) ExecuteBatch(ctx context.Context, batch *BatchCommand) (*BatchResult, error)
- func (cb *CommandBus) GetHandlerCount() int
- func (cb *CommandBus) GetRegisteredCommands() []string
- func (cb *CommandBus) HealthCheck(ctx context.Context) error
- func (cb *CommandBus) RegisterHandler(handler CommandHandler) error
- func (cb *CommandBus) UnregisterHandler(commandType string) error
- func (cb *CommandBus) ValidateCommand(command Command) error
- type CommandBusConfig
- type CommandHandler
- type CommandResult
- type EventStore
- type InMemoryQueryCache
- type PaginatedQuery
- type PaginatedResult
- type Projection
- type ProjectionConfig
- type ProjectionManager
- func (pm *ProjectionManager) Close() error
- func (pm *ProjectionManager) GetAllProjectionStates() ([]*ProjectionState, error)
- func (pm *ProjectionManager) GetProjectionCount() int
- func (pm *ProjectionManager) GetProjectionState(projectionName string) (*ProjectionState, error)
- func (pm *ProjectionManager) HealthCheck(ctx context.Context) error
- func (pm *ProjectionManager) IsRunning() bool
- func (pm *ProjectionManager) ProjectEvents(ctx context.Context, projectionName string, fromPosition int64) error
- func (pm *ProjectionManager) RebuildProjection(ctx context.Context, projectionName string) error
- func (pm *ProjectionManager) RegisterProjection(projection Projection) error
- func (pm *ProjectionManager) ResetProjection(ctx context.Context, projectionName string) error
- func (pm *ProjectionManager) Start(ctx context.Context) error
- func (pm *ProjectionManager) Stop() error
- func (pm *ProjectionManager) UnregisterProjection(name string) error
- type ProjectionResult
- type ProjectionState
- type Query
- type QueryBus
- func (qb *QueryBus) Close() error
- func (qb *QueryBus) Execute(ctx context.Context, query Query) (*QueryResult, error)
- func (qb *QueryBus) ExecuteAsync(ctx context.Context, query Query) <-chan *AsyncQueryResult
- func (qb *QueryBus) GetCacheStats() *CacheStats
- func (qb *QueryBus) GetHandlerCount() int
- func (qb *QueryBus) GetRegisteredQueries() []string
- func (qb *QueryBus) HealthCheck(ctx context.Context) error
- func (qb *QueryBus) InvalidateCache(pattern string)
- func (qb *QueryBus) RegisterHandler(handler QueryHandler) error
- func (qb *QueryBus) UnregisterHandler(queryType string) error
- func (qb *QueryBus) ValidateQuery(query Query) error
- type QueryBusConfig
- type QueryCache
- type QueryHandler
- type QueryResult
- type ReadModel
- type ReadModelConfig
- type ReadModelRecord
- type ReadModelStore
- func (rms *ReadModelStore) CleanupExpired(ctx context.Context) (int64, error)
- func (rms *ReadModelStore) Close() error
- func (rms *ReadModelStore) Delete(ctx context.Context, id, modelType string) error
- func (rms *ReadModelStore) DeleteByType(ctx context.Context, modelType string) error
- func (rms *ReadModelStore) Exists(ctx context.Context, id, modelType string) (bool, error)
- func (rms *ReadModelStore) Get(ctx context.Context, id, modelType string, result interface{}) error
- func (rms *ReadModelStore) GetStats(ctx context.Context) (map[string]interface{}, error)
- func (rms *ReadModelStore) List(ctx context.Context, modelType string, filter map[string]interface{}, ...) ([]*ReadModelRecord, int64, error)
- func (rms *ReadModelStore) Save(ctx context.Context, model ReadModel) error
- func (rms *ReadModelStore) SaveWithTTL(ctx context.Context, model ReadModel, ttl time.Duration) error
- type UserAggregate
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Aggregate ¶
type Aggregate interface {
GetID() string
GetType() string
GetVersion() int
GetUncommittedEvents() []Event
MarkEventsAsCommitted()
LoadFromHistory(events []*StoredEvent) error
ApplyEvent(event Event) error
}
Aggregate represents a domain aggregate root
type AggregateConfig ¶
type AggregateConfig struct {
SnapshotFrequency int `json:"snapshot_frequency"`
EnableSnapshots bool `json:"enable_snapshots"`
MaxEventCount int `json:"max_event_count"`
}
AggregateConfig holds aggregate repository configuration
type AggregateFactory ¶
type AggregateFactory struct {
// contains filtered or unexported fields
}
AggregateFactory creates aggregate instances
func NewAggregateFactory ¶
func NewAggregateFactory(logger *logrus.Logger) *AggregateFactory
NewAggregateFactory creates a new aggregate factory
func (*AggregateFactory) Create ¶
func (af *AggregateFactory) Create(aggregateType string) (Aggregate, error)
Create creates a new aggregate instance
func (*AggregateFactory) CreateNew ¶
func (af *AggregateFactory) CreateNew(aggregateType string) (Aggregate, error)
CreateNew creates a new aggregate with a generated UUID
func (*AggregateFactory) CreateWithID ¶
func (af *AggregateFactory) CreateWithID(aggregateType, id string) (Aggregate, error)
CreateWithID creates a new aggregate instance with a specific ID
func (*AggregateFactory) GetRegisteredTypes ¶
func (af *AggregateFactory) GetRegisteredTypes() []string
GetRegisteredTypes returns all registered aggregate types
func (*AggregateFactory) RegisterType ¶
func (af *AggregateFactory) RegisterType(aggregateType string, aggregateStruct interface{})
RegisterType registers an aggregate type
type AggregateRepository ¶
type AggregateRepository struct {
// contains filtered or unexported fields
}
AggregateRepository handles aggregate persistence and retrieval
func NewAggregateRepository ¶
func NewAggregateRepository(eventStore *EventStore, config AggregateConfig, logger *logrus.Logger) *AggregateRepository
NewAggregateRepository creates a new aggregate repository
func (*AggregateRepository) Delete ¶
func (ar *AggregateRepository) Delete(ctx context.Context, aggregate Aggregate, deletionEvent Event) error
Delete marks an aggregate as deleted (typically by adding a deletion event)
func (*AggregateRepository) Exists ¶
func (ar *AggregateRepository) Exists(ctx context.Context, aggregateID, aggregateType string) (bool, error)
Exists checks if an aggregate exists
func (*AggregateRepository) GetVersion ¶
func (ar *AggregateRepository) GetVersion(ctx context.Context, aggregateID, aggregateType string) (int, error)
GetVersion returns the current version of an aggregate
type AsyncCommandResult ¶
type AsyncCommandResult struct {
Result *CommandResult
Error error
}
AsyncCommandResult represents the result of an async command execution
type AsyncQueryResult ¶
type AsyncQueryResult struct {
Result *QueryResult
Error error
}
AsyncQueryResult represents the result of an async query execution
type BaseAggregate ¶
type BaseAggregate struct {
ID string `json:"id"`
Type string `json:"type"`
Version int `json:"version"`
UncommittedEvents []Event `json:"-"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
TenantID string `json:"tenant_id"`
}
BaseAggregate provides a base implementation for aggregates
func (*BaseAggregate) AddEvent ¶
func (ba *BaseAggregate) AddEvent(event Event)
AddEvent adds an event to the uncommitted events list
func (*BaseAggregate) ApplyEvent ¶
func (ba *BaseAggregate) ApplyEvent(event Event) error
ApplyEvent applies an event to the aggregate (to be overridden by concrete aggregates)
func (*BaseAggregate) GetID ¶
func (ba *BaseAggregate) GetID() string
GetID returns the aggregate ID
func (*BaseAggregate) GetType ¶
func (ba *BaseAggregate) GetType() string
GetType returns the aggregate type
func (*BaseAggregate) GetUncommittedEvents ¶
func (ba *BaseAggregate) GetUncommittedEvents() []Event
GetUncommittedEvents returns uncommitted events
func (*BaseAggregate) GetVersion ¶
func (ba *BaseAggregate) GetVersion() int
GetVersion returns the aggregate version
func (*BaseAggregate) LoadFromHistory ¶
func (ba *BaseAggregate) LoadFromHistory(events []*StoredEvent) error
LoadFromHistory loads the aggregate from historical events
func (*BaseAggregate) MarkEventsAsCommitted ¶
func (ba *BaseAggregate) MarkEventsAsCommitted()
MarkEventsAsCommitted marks all uncommitted events as committed
type BaseCommand ¶
type BaseCommand struct {
CommandType string `json:"command_type"`
AggregateID string `json:"aggregate_id"`
Metadata map[string]interface{} `json:"metadata"`
UserID string `json:"user_id"`
TenantID string `json:"tenant_id"`
CorrelationID string `json:"correlation_id"`
}
BaseCommand provides a base implementation for commands
func (*BaseCommand) GetAggregateID ¶
func (bc *BaseCommand) GetAggregateID() string
GetAggregateID returns the aggregate ID
func (*BaseCommand) GetCommandType ¶
func (bc *BaseCommand) GetCommandType() string
GetCommandType returns the command type
func (*BaseCommand) GetMetadata ¶
func (bc *BaseCommand) GetMetadata() map[string]interface{}
GetMetadata returns the command metadata
type BaseProjection ¶
type BaseProjection struct {
Name string `json:"name"`
EventTypes []string `json:"event_types"`
Position int64 `json:"position"`
Logger *logrus.Logger
ReadStore *ReadModelStore
}
BaseProjection provides a base implementation for projections
func (*BaseProjection) GetEventTypes ¶
func (bp *BaseProjection) GetEventTypes() []string
GetEventTypes returns the event types this projection handles
func (*BaseProjection) GetName ¶
func (bp *BaseProjection) GetName() string
GetName returns the projection name
func (*BaseProjection) GetPosition ¶
func (bp *BaseProjection) GetPosition() int64
GetPosition returns the current projection position
func (*BaseProjection) SetPosition ¶
func (bp *BaseProjection) SetPosition(position int64)
SetPosition sets the projection position
type BaseQuery ¶
type BaseQuery struct {
QueryType string `json:"query_type"`
Parameters map[string]interface{} `json:"parameters"`
CacheKey string `json:"cache_key"`
Cacheable bool `json:"cacheable"`
UserID string `json:"user_id"`
TenantID string `json:"tenant_id"`
CorrelationID string `json:"correlation_id"`
}
BaseQuery provides a base implementation for queries
func (*BaseQuery) GetCacheKey ¶
GetCacheKey returns the cache key
func (*BaseQuery) GetParameters ¶
GetParameters returns the query parameters
func (*BaseQuery) GetQueryType ¶
GetQueryType returns the query type
func (*BaseQuery) IsCacheable ¶
IsCacheable returns whether the query is cacheable
type BaseReadModel ¶
type BaseReadModel struct {
ID string `json:"id"`
Type string `json:"type"`
Version int `json:"version"`
LastUpdated time.Time `json:"last_updated"`
TenantID string `json:"tenant_id"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
BaseReadModel provides a base implementation for read models
func (*BaseReadModel) GetID ¶
func (brm *BaseReadModel) GetID() string
GetID returns the read model ID
func (*BaseReadModel) GetLastUpdated ¶
func (brm *BaseReadModel) GetLastUpdated() time.Time
GetLastUpdated returns the last updated timestamp
func (*BaseReadModel) GetType ¶
func (brm *BaseReadModel) GetType() string
GetType returns the read model type
func (*BaseReadModel) GetVersion ¶
func (brm *BaseReadModel) GetVersion() int
GetVersion returns the read model version
type BatchCommand ¶
type BatchCommand struct {
Commands []Command `json:"commands"`
Transaction bool `json:"transaction"` // If true, all commands must succeed
}
BatchCommand represents a batch of commands to execute
type BatchResult ¶
type BatchResult struct {
Results []*CommandResult `json:"results"`
Success bool `json:"success"`
FailedAt int `json:"failed_at,omitempty"` // Index of failed command
TotalCount int `json:"total_count"`
}
BatchResult represents the result of batch command execution
type CacheStats ¶
type CacheStats struct {
Hits int64 `json:"hits"`
Misses int64 `json:"misses"`
Entries int `json:"entries"`
HitRatio float64 `json:"hit_ratio"`
}
CacheStats represents cache statistics
type Command ¶
type Command interface {
GetCommandType() string
GetAggregateID() string
GetMetadata() map[string]interface{}
}
Command represents a command that can be executed
type CommandBus ¶
type CommandBus struct {
// contains filtered or unexported fields
}
CommandBus handles command dispatching and execution
func NewCommandBus ¶
func NewCommandBus(config CommandBusConfig, logger *logrus.Logger) *CommandBus
NewCommandBus creates a new command bus
func (*CommandBus) Execute ¶
func (cb *CommandBus) Execute(ctx context.Context, command Command) (*CommandResult, error)
Execute executes a command
func (*CommandBus) ExecuteAsync ¶
func (cb *CommandBus) ExecuteAsync(ctx context.Context, command Command) <-chan *AsyncCommandResult
ExecuteAsync executes a command asynchronously
func (*CommandBus) ExecuteBatch ¶
func (cb *CommandBus) ExecuteBatch(ctx context.Context, batch *BatchCommand) (*BatchResult, error)
ExecuteBatch executes multiple commands in a batch
func (*CommandBus) GetHandlerCount ¶
func (cb *CommandBus) GetHandlerCount() int
GetHandlerCount returns the number of registered handlers
func (*CommandBus) GetRegisteredCommands ¶
func (cb *CommandBus) GetRegisteredCommands() []string
GetRegisteredCommands returns a list of registered command types
func (*CommandBus) HealthCheck ¶
func (cb *CommandBus) HealthCheck(ctx context.Context) error
HealthCheck performs a health check on the command bus
func (*CommandBus) RegisterHandler ¶
func (cb *CommandBus) RegisterHandler(handler CommandHandler) error
RegisterHandler registers a command handler
func (*CommandBus) UnregisterHandler ¶
func (cb *CommandBus) UnregisterHandler(commandType string) error
UnregisterHandler removes a command handler
func (*CommandBus) ValidateCommand ¶
func (cb *CommandBus) ValidateCommand(command Command) error
ValidateCommand validates a command before execution
type CommandBusConfig ¶
type CommandBusConfig struct {
EnableMetrics bool `json:"enable_metrics"`
EnableTracing bool `json:"enable_tracing"`
MaxConcurrent int `json:"max_concurrent"`
TimeoutDuration int `json:"timeout_duration"` // seconds
}
CommandBusConfig holds command bus configuration
type CommandHandler ¶
type CommandHandler interface {
Handle(ctx context.Context, command Command) (*CommandResult, error)
GetCommandType() string
}
CommandHandler defines the interface for command handlers
type CommandResult ¶
type CommandResult struct {
AggregateID string `json:"aggregate_id"`
EventsEmitted []interface{} `json:"events_emitted"`
Success bool `json:"success"`
Message string `json:"message,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
Version int `json:"version"`
}
CommandResult represents the result of a command execution
type EventStore ¶
type EventStore interface {
GetEventStream(ctx context.Context, aggregateID, aggregateType string, fromVersion int) (*EventStream, error)
GetEventsByTimeRange(ctx context.Context, start, end time.Time, limit int) ([]*StoredEvent, error)
GetEventsByType(ctx context.Context, eventType string, limit int, offset int) ([]*StoredEvent, error)
GetEventCount(ctx context.Context) (int64, error)
}
EventStore interface for accessing events (matches the event store implementation)
type InMemoryQueryCache ¶
type InMemoryQueryCache struct {
// contains filtered or unexported fields
}
InMemoryQueryCache provides an in-memory implementation of QueryCache
func NewInMemoryQueryCache ¶
func NewInMemoryQueryCache() *InMemoryQueryCache
NewInMemoryQueryCache creates a new in-memory query cache
func (*InMemoryQueryCache) Clear ¶
func (imc *InMemoryQueryCache) Clear()
Clear removes all cached entries
func (*InMemoryQueryCache) Delete ¶
func (imc *InMemoryQueryCache) Delete(key string)
Delete removes a cached entry
func (*InMemoryQueryCache) Get ¶
func (imc *InMemoryQueryCache) Get(key string) (*QueryResult, bool)
Get retrieves a cached query result
func (*InMemoryQueryCache) GetStats ¶
func (imc *InMemoryQueryCache) GetStats() CacheStats
GetStats returns cache statistics
func (*InMemoryQueryCache) Set ¶
func (imc *InMemoryQueryCache) Set(key string, result *QueryResult, ttl time.Duration)
Set stores a query result in cache
type PaginatedQuery ¶
type PaginatedQuery struct {
*BaseQuery
Page int `json:"page"`
PageSize int `json:"page_size"`
SortBy string `json:"sort_by,omitempty"`
SortDir string `json:"sort_dir,omitempty"` // asc, desc
}
PaginatedQuery represents a query with pagination parameters
func NewPaginatedQuery ¶
func NewPaginatedQuery(queryType string, page, pageSize int) *PaginatedQuery
NewPaginatedQuery creates a new paginated query
type PaginatedResult ¶
type PaginatedResult struct {
*QueryResult
Page int `json:"page"`
PageSize int `json:"page_size"`
TotalCount int `json:"total_count"`
TotalPages int `json:"total_pages"`
HasNext bool `json:"has_next"`
HasPrevious bool `json:"has_previous"`
}
PaginatedResult represents a paginated query result
type Projection ¶
type Projection interface {
GetName() string
GetEventTypes() []string
Project(ctx context.Context, event *StoredEvent) (*ProjectionResult, error)
Reset(ctx context.Context) error
GetPosition() int64
SetPosition(position int64)
}
Projection defines the interface for event projections
type ProjectionConfig ¶
type ProjectionConfig struct {
BatchSize int `json:"batch_size"`
PollInterval time.Duration `json:"poll_interval"`
ErrorRetryCount int `json:"error_retry_count"`
ErrorRetryDelay time.Duration `json:"error_retry_delay"`
EnableMetrics bool `json:"enable_metrics"`
}
ProjectionConfig holds projection manager configuration
type ProjectionManager ¶
type ProjectionManager struct {
// contains filtered or unexported fields
}
ProjectionManager manages event projections to read models
func NewProjectionManager ¶
func NewProjectionManager(eventStore EventStore, readStore *ReadModelStore, config ProjectionConfig, logger *logrus.Logger) *ProjectionManager
NewProjectionManager creates a new projection manager
func (*ProjectionManager) Close ¶
func (pm *ProjectionManager) Close() error
Close closes the projection manager
func (*ProjectionManager) GetAllProjectionStates ¶
func (pm *ProjectionManager) GetAllProjectionStates() ([]*ProjectionState, error)
GetAllProjectionStates returns the state of all projections
func (*ProjectionManager) GetProjectionCount ¶
func (pm *ProjectionManager) GetProjectionCount() int
GetProjectionCount returns the number of registered projections
func (*ProjectionManager) GetProjectionState ¶
func (pm *ProjectionManager) GetProjectionState(projectionName string) (*ProjectionState, error)
GetProjectionState returns the state of a projection
func (*ProjectionManager) HealthCheck ¶
func (pm *ProjectionManager) HealthCheck(ctx context.Context) error
HealthCheck performs a health check on the projection manager
func (*ProjectionManager) IsRunning ¶
func (pm *ProjectionManager) IsRunning() bool
IsRunning returns whether the projection manager is running
func (*ProjectionManager) ProjectEvents ¶
func (pm *ProjectionManager) ProjectEvents(ctx context.Context, projectionName string, fromPosition int64) error
ProjectEvents manually projects events for a specific projection
func (*ProjectionManager) RebuildProjection ¶
func (pm *ProjectionManager) RebuildProjection(ctx context.Context, projectionName string) error
RebuildProjection rebuilds a projection from scratch
func (*ProjectionManager) RegisterProjection ¶
func (pm *ProjectionManager) RegisterProjection(projection Projection) error
RegisterProjection registers a projection
func (*ProjectionManager) ResetProjection ¶
func (pm *ProjectionManager) ResetProjection(ctx context.Context, projectionName string) error
ResetProjection resets a projection to start from the beginning
func (*ProjectionManager) Start ¶
func (pm *ProjectionManager) Start(ctx context.Context) error
Start starts the projection manager
func (*ProjectionManager) Stop ¶
func (pm *ProjectionManager) Stop() error
Stop stops the projection manager
func (*ProjectionManager) UnregisterProjection ¶
func (pm *ProjectionManager) UnregisterProjection(name string) error
UnregisterProjection removes a projection
type ProjectionResult ¶
type ProjectionResult struct {
ReadModels []ReadModel `json:"read_models"`
Success bool `json:"success"`
Message string `json:"message,omitempty"`
Position int64 `json:"position"`
}
ProjectionResult represents the result of a projection
type ProjectionState ¶
type ProjectionState struct {
Name string `json:"name"`
Position int64 `json:"position"`
LastUpdated time.Time `json:"last_updated"`
EventsCount int64 `json:"events_count"`
ErrorsCount int64 `json:"errors_count"`
IsRunning bool `json:"is_running"`
LastError string `json:"last_error,omitempty"`
}
ProjectionState represents the state of a projection
type Query ¶
type Query interface {
GetQueryType() string
GetParameters() map[string]interface{}
GetCacheKey() string
IsCacheable() bool
}
Query represents a query that can be executed
type QueryBus ¶
type QueryBus struct {
// contains filtered or unexported fields
}
QueryBus handles query dispatching and execution
func NewQueryBus ¶
func NewQueryBus(config QueryBusConfig, logger *logrus.Logger) *QueryBus
NewQueryBus creates a new query bus
func (*QueryBus) ExecuteAsync ¶
func (qb *QueryBus) ExecuteAsync(ctx context.Context, query Query) <-chan *AsyncQueryResult
ExecuteAsync executes a query asynchronously
func (*QueryBus) GetCacheStats ¶
func (qb *QueryBus) GetCacheStats() *CacheStats
GetCacheStats returns cache statistics
func (*QueryBus) GetHandlerCount ¶
GetHandlerCount returns the number of registered handlers
func (*QueryBus) GetRegisteredQueries ¶
GetRegisteredQueries returns a list of registered query types
func (*QueryBus) HealthCheck ¶
HealthCheck performs a health check on the query bus
func (*QueryBus) InvalidateCache ¶
InvalidateCache invalidates cached results for a specific pattern
func (*QueryBus) RegisterHandler ¶
func (qb *QueryBus) RegisterHandler(handler QueryHandler) error
RegisterHandler registers a query handler
func (*QueryBus) UnregisterHandler ¶
UnregisterHandler removes a query handler
func (*QueryBus) ValidateQuery ¶
ValidateQuery validates a query before execution
type QueryBusConfig ¶
type QueryBusConfig struct {
EnableCache bool `json:"enable_cache"`
CacheTTL time.Duration `json:"cache_ttl"`
EnableMetrics bool `json:"enable_metrics"`
EnableTracing bool `json:"enable_tracing"`
MaxConcurrent int `json:"max_concurrent"`
TimeoutDuration time.Duration `json:"timeout_duration"`
}
QueryBusConfig holds query bus configuration
type QueryCache ¶
type QueryCache interface {
Get(key string) (*QueryResult, bool)
Set(key string, result *QueryResult, ttl time.Duration)
Delete(key string)
Clear()
GetStats() CacheStats
}
QueryCache interface for caching query results
type QueryHandler ¶
type QueryHandler interface {
Handle(ctx context.Context, query Query) (*QueryResult, error)
GetQueryType() string
}
QueryHandler defines the interface for query handlers
type QueryResult ¶
type QueryResult struct {
Data interface{} `json:"data"`
Success bool `json:"success"`
Message string `json:"message,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
Count int `json:"count,omitempty"`
FromCache bool `json:"from_cache"`
Timestamp time.Time `json:"timestamp"`
}
QueryResult represents the result of a query execution
type ReadModel ¶
type ReadModel interface {
GetID() string
GetType() string
GetVersion() int
GetLastUpdated() time.Time
}
ReadModel represents a read-optimized data model
type ReadModelConfig ¶
type ReadModelConfig struct {
TablePrefix string `json:"table_prefix"`
EnableVersioning bool `json:"enable_versioning"`
EnableTTL bool `json:"enable_ttl"`
DefaultTTL time.Duration `json:"default_ttl"`
BatchSize int `json:"batch_size"`
}
ReadModelConfig holds read model configuration
type ReadModelRecord ¶
type ReadModelRecord struct {
ID string `gorm:"type:uuid;primary_key" json:"id"`
Type string `gorm:"type:varchar(255);not null;index" json:"type"`
Data string `gorm:"type:jsonb;not null" json:"data"`
Version int `gorm:"not null;default:1" json:"version"`
CreatedAt time.Time `gorm:"autoCreateTime" json:"created_at"`
UpdatedAt time.Time `gorm:"autoUpdateTime" json:"updated_at"`
ExpiresAt *time.Time `gorm:"index" json:"expires_at,omitempty"`
TenantID string `gorm:"type:uuid;index" json:"tenant_id"`
Metadata string `gorm:"type:jsonb" json:"metadata,omitempty"`
}
ReadModelRecord represents a stored read model
type ReadModelStore ¶
type ReadModelStore struct {
// contains filtered or unexported fields
}
ReadModelStore handles storage and retrieval of read models
func NewReadModelStore ¶
func NewReadModelStore(db *gorm.DB, config ReadModelConfig, logger *logrus.Logger) (*ReadModelStore, error)
NewReadModelStore creates a new read model store
func (*ReadModelStore) CleanupExpired ¶
func (rms *ReadModelStore) CleanupExpired(ctx context.Context) (int64, error)
CleanupExpired removes expired read models
func (*ReadModelStore) Close ¶
func (rms *ReadModelStore) Close() error
Close closes the read model store
func (*ReadModelStore) Delete ¶
func (rms *ReadModelStore) Delete(ctx context.Context, id, modelType string) error
Delete removes a read model
func (*ReadModelStore) DeleteByType ¶
func (rms *ReadModelStore) DeleteByType(ctx context.Context, modelType string) error
DeleteByType removes all read models of a specific type
func (*ReadModelStore) Get ¶
func (rms *ReadModelStore) Get(ctx context.Context, id, modelType string, result interface{}) error
Get retrieves a read model by ID and type
func (*ReadModelStore) GetStats ¶
func (rms *ReadModelStore) GetStats(ctx context.Context) (map[string]interface{}, error)
GetStats returns statistics about stored read models
func (*ReadModelStore) List ¶
func (rms *ReadModelStore) List(ctx context.Context, modelType string, filter map[string]interface{}, limit, offset int) ([]*ReadModelRecord, int64, error)
List retrieves multiple read models by type with optional filtering
func (*ReadModelStore) Save ¶
func (rms *ReadModelStore) Save(ctx context.Context, model ReadModel) error
Save saves a read model to the store
func (*ReadModelStore) SaveWithTTL ¶
func (rms *ReadModelStore) SaveWithTTL(ctx context.Context, model ReadModel, ttl time.Duration) error
SaveWithTTL saves a read model with a custom TTL
type UserAggregate ¶
type UserAggregate struct {
*BaseAggregate
Email string `json:"email"`
Name string `json:"name"`
IsActive bool `json:"is_active"`
DeletedAt *time.Time `json:"deleted_at,omitempty"`
}
UserAggregate example aggregate
func NewUserAggregate ¶
func NewUserAggregate(id, email, name string) *UserAggregate
NewUserAggregate creates a new user aggregate
func (*UserAggregate) ApplyEvent ¶
func (ua *UserAggregate) ApplyEvent(event Event) error
ApplyEvent applies events to the user aggregate
func (*UserAggregate) ChangeEmail ¶
func (ua *UserAggregate) ChangeEmail(newEmail string, userID string)
ChangeEmail changes the user's email
func (*UserAggregate) Deactivate ¶
func (ua *UserAggregate) Deactivate(userID string, reason string)
Deactivate deactivates the user