 Documentation
      ¶
      Documentation
      ¶
    
    
  
    
  
    Index ¶
- func CalculatePagination(page, pageSize, totalCount int) (int, bool, bool)
- type Aggregate
- type AggregateConfig
- type AggregateFactory
- func (af *AggregateFactory) Create(aggregateType string) (Aggregate, error)
- func (af *AggregateFactory) CreateNew(aggregateType string) (Aggregate, error)
- func (af *AggregateFactory) CreateWithID(aggregateType, id string) (Aggregate, error)
- func (af *AggregateFactory) GetRegisteredTypes() []string
- func (af *AggregateFactory) RegisterType(aggregateType string, aggregateStruct interface{})
 
- type AggregateRepository
- func (ar *AggregateRepository) Delete(ctx context.Context, aggregate Aggregate, deletionEvent Event) error
- func (ar *AggregateRepository) Exists(ctx context.Context, aggregateID, aggregateType string) (bool, error)
- func (ar *AggregateRepository) GetVersion(ctx context.Context, aggregateID, aggregateType string) (int, error)
- func (ar *AggregateRepository) Load(ctx context.Context, aggregateID, aggregateType string, aggregate Aggregate) error
- func (ar *AggregateRepository) Save(ctx context.Context, aggregate Aggregate) error
 
- type AsyncCommandResult
- type AsyncQueryResult
- type BaseAggregate
- func (ba *BaseAggregate) AddEvent(event Event)
- func (ba *BaseAggregate) ApplyEvent(event Event) error
- func (ba *BaseAggregate) GetID() string
- func (ba *BaseAggregate) GetType() string
- func (ba *BaseAggregate) GetUncommittedEvents() []Event
- func (ba *BaseAggregate) GetVersion() int
- func (ba *BaseAggregate) LoadFromHistory(events []*StoredEvent) error
- func (ba *BaseAggregate) MarkEventsAsCommitted()
 
- type BaseCommand
- type BaseEvent
- func (e *BaseEvent) GetAggregateID() string
- func (e *BaseEvent) GetAggregateType() string
- func (e *BaseEvent) GetEventData() map[string]interface{}
- func (e *BaseEvent) GetEventType() string
- func (e *BaseEvent) GetMetadata() map[string]interface{}
- func (e *BaseEvent) GetTimestamp() time.Time
- func (e *BaseEvent) GetVersion() int
- func (e *BaseEvent) WithMetadata(key string, value interface{}) *BaseEvent
 
- type BaseProjection
- type BaseQuery
- type BaseReadModel
- type BatchCommand
- type BatchResult
- type CacheStats
- type Command
- type CommandBus
- func (cb *CommandBus) Close() error
- func (cb *CommandBus) Execute(ctx context.Context, command Command) (*CommandResult, error)
- func (cb *CommandBus) ExecuteAsync(ctx context.Context, command Command) <-chan *AsyncCommandResult
- func (cb *CommandBus) ExecuteBatch(ctx context.Context, batch *BatchCommand) (*BatchResult, error)
- func (cb *CommandBus) GetHandlerCount() int
- func (cb *CommandBus) GetRegisteredCommands() []string
- func (cb *CommandBus) HealthCheck(ctx context.Context) error
- func (cb *CommandBus) RegisterHandler(handler CommandHandler) error
- func (cb *CommandBus) UnregisterHandler(commandType string) error
- func (cb *CommandBus) ValidateCommand(command Command) error
 
- type CommandBusConfig
- type CommandHandler
- type CommandResult
- type Event
- type EventStore
- type EventStream
- type InMemoryQueryCache
- type PaginatedQuery
- type PaginatedResult
- type Projection
- type ProjectionConfig
- type ProjectionManager
- func (pm *ProjectionManager) Close() error
- func (pm *ProjectionManager) GetAllProjectionStates() ([]*ProjectionState, error)
- func (pm *ProjectionManager) GetProjectionCount() int
- func (pm *ProjectionManager) GetProjectionState(projectionName string) (*ProjectionState, error)
- func (pm *ProjectionManager) HealthCheck(ctx context.Context) error
- func (pm *ProjectionManager) IsRunning() bool
- func (pm *ProjectionManager) ProjectEvents(ctx context.Context, projectionName string, fromPosition int64) error
- func (pm *ProjectionManager) RebuildProjection(ctx context.Context, projectionName string) error
- func (pm *ProjectionManager) RegisterProjection(projection Projection) error
- func (pm *ProjectionManager) ResetProjection(ctx context.Context, projectionName string) error
- func (pm *ProjectionManager) Start(ctx context.Context) error
- func (pm *ProjectionManager) Stop() error
- func (pm *ProjectionManager) UnregisterProjection(name string) error
 
- type ProjectionResult
- type ProjectionState
- type Query
- type QueryBus
- func (qb *QueryBus) Close() error
- func (qb *QueryBus) Execute(ctx context.Context, query Query) (*QueryResult, error)
- func (qb *QueryBus) ExecuteAsync(ctx context.Context, query Query) <-chan *AsyncQueryResult
- func (qb *QueryBus) GetCacheStats() *CacheStats
- func (qb *QueryBus) GetHandlerCount() int
- func (qb *QueryBus) GetRegisteredQueries() []string
- func (qb *QueryBus) HealthCheck(ctx context.Context) error
- func (qb *QueryBus) InvalidateCache(pattern string)
- func (qb *QueryBus) RegisterHandler(handler QueryHandler) error
- func (qb *QueryBus) UnregisterHandler(queryType string) error
- func (qb *QueryBus) ValidateQuery(query Query) error
 
- type QueryBusConfig
- type QueryCache
- type QueryHandler
- type QueryResult
- type ReadModel
- type ReadModelConfig
- type ReadModelRecord
- type ReadModelStore
- func (rms *ReadModelStore) CleanupExpired(ctx context.Context) (int64, error)
- func (rms *ReadModelStore) Close() error
- func (rms *ReadModelStore) Delete(ctx context.Context, id, modelType string) error
- func (rms *ReadModelStore) DeleteByType(ctx context.Context, modelType string) error
- func (rms *ReadModelStore) Exists(ctx context.Context, id, modelType string) (bool, error)
- func (rms *ReadModelStore) Get(ctx context.Context, id, modelType string, result interface{}) error
- func (rms *ReadModelStore) GetStats(ctx context.Context) (map[string]interface{}, error)
- func (rms *ReadModelStore) List(ctx context.Context, modelType string, filter map[string]interface{}, ...) ([]*ReadModelRecord, int64, error)
- func (rms *ReadModelStore) Save(ctx context.Context, model ReadModel) error
- func (rms *ReadModelStore) SaveWithTTL(ctx context.Context, model ReadModel, ttl time.Duration) error
 
- type Snapshot
- type StoredEvent
- type UserAggregate
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Aggregate ¶
type Aggregate interface {
	GetID() string
	GetType() string
	GetVersion() int
	GetUncommittedEvents() []Event
	MarkEventsAsCommitted()
	LoadFromHistory(events []*StoredEvent) error
	ApplyEvent(event Event) error
}
    Aggregate represents a domain aggregate root
type AggregateConfig ¶
type AggregateConfig struct {
	SnapshotFrequency int  `json:"snapshot_frequency"`
	EnableSnapshots   bool `json:"enable_snapshots"`
	MaxEventCount     int  `json:"max_event_count"`
}
    AggregateConfig holds aggregate repository configuration
type AggregateFactory ¶
type AggregateFactory struct {
	// contains filtered or unexported fields
}
    AggregateFactory creates aggregate instances
func NewAggregateFactory ¶
func NewAggregateFactory(logger *logrus.Logger) *AggregateFactory
NewAggregateFactory creates a new aggregate factory
func (*AggregateFactory) Create ¶
func (af *AggregateFactory) Create(aggregateType string) (Aggregate, error)
Create creates a new aggregate instance
func (*AggregateFactory) CreateNew ¶
func (af *AggregateFactory) CreateNew(aggregateType string) (Aggregate, error)
CreateNew creates a new aggregate with a generated UUID
func (*AggregateFactory) CreateWithID ¶
func (af *AggregateFactory) CreateWithID(aggregateType, id string) (Aggregate, error)
CreateWithID creates a new aggregate instance with a specific ID
func (*AggregateFactory) GetRegisteredTypes ¶
func (af *AggregateFactory) GetRegisteredTypes() []string
GetRegisteredTypes returns all registered aggregate types
func (*AggregateFactory) RegisterType ¶
func (af *AggregateFactory) RegisterType(aggregateType string, aggregateStruct interface{})
RegisterType registers an aggregate type
type AggregateRepository ¶
type AggregateRepository struct {
	// contains filtered or unexported fields
}
    AggregateRepository handles aggregate persistence and retrieval
func NewAggregateRepository ¶
func NewAggregateRepository(eventStore *EventStore, config AggregateConfig, logger *logrus.Logger) *AggregateRepository
NewAggregateRepository creates a new aggregate repository
func (*AggregateRepository) Delete ¶
func (ar *AggregateRepository) Delete(ctx context.Context, aggregate Aggregate, deletionEvent Event) error
Delete marks an aggregate as deleted (typically by adding a deletion event)
func (*AggregateRepository) Exists ¶
func (ar *AggregateRepository) Exists(ctx context.Context, aggregateID, aggregateType string) (bool, error)
Exists checks if an aggregate exists
func (*AggregateRepository) GetVersion ¶
func (ar *AggregateRepository) GetVersion(ctx context.Context, aggregateID, aggregateType string) (int, error)
GetVersion returns the current version of an aggregate
type AsyncCommandResult ¶
type AsyncCommandResult struct {
	Result *CommandResult
	Error  error
}
    AsyncCommandResult represents the result of an async command execution
type AsyncQueryResult ¶
type AsyncQueryResult struct {
	Result *QueryResult
	Error  error
}
    AsyncQueryResult represents the result of an async query execution
type BaseAggregate ¶
type BaseAggregate struct {
	ID                string    `json:"id"`
	Type              string    `json:"type"`
	Version           int       `json:"version"`
	UncommittedEvents []Event   `json:"-"`
	CreatedAt         time.Time `json:"created_at"`
	UpdatedAt         time.Time `json:"updated_at"`
	TenantID          string    `json:"tenant_id"`
}
    BaseAggregate provides a base implementation for aggregates
func (*BaseAggregate) AddEvent ¶
func (ba *BaseAggregate) AddEvent(event Event)
AddEvent adds an event to the uncommitted events list
func (*BaseAggregate) ApplyEvent ¶
func (ba *BaseAggregate) ApplyEvent(event Event) error
ApplyEvent applies an event to the aggregate (to be overridden by concrete aggregates)
func (*BaseAggregate) GetID ¶
func (ba *BaseAggregate) GetID() string
GetID returns the aggregate ID
func (*BaseAggregate) GetType ¶
func (ba *BaseAggregate) GetType() string
GetType returns the aggregate type
func (*BaseAggregate) GetUncommittedEvents ¶
func (ba *BaseAggregate) GetUncommittedEvents() []Event
GetUncommittedEvents returns uncommitted events
func (*BaseAggregate) GetVersion ¶
func (ba *BaseAggregate) GetVersion() int
GetVersion returns the aggregate version
func (*BaseAggregate) LoadFromHistory ¶
func (ba *BaseAggregate) LoadFromHistory(events []*StoredEvent) error
LoadFromHistory loads the aggregate from historical events
func (*BaseAggregate) MarkEventsAsCommitted ¶
func (ba *BaseAggregate) MarkEventsAsCommitted()
MarkEventsAsCommitted marks all uncommitted events as committed
type BaseCommand ¶
type BaseCommand struct {
	CommandType   string                 `json:"command_type"`
	AggregateID   string                 `json:"aggregate_id"`
	Metadata      map[string]interface{} `json:"metadata"`
	UserID        string                 `json:"user_id"`
	TenantID      string                 `json:"tenant_id"`
	CorrelationID string                 `json:"correlation_id"`
}
    BaseCommand provides a base implementation for commands
func (*BaseCommand) GetAggregateID ¶
func (bc *BaseCommand) GetAggregateID() string
GetAggregateID returns the aggregate ID
func (*BaseCommand) GetCommandType ¶
func (bc *BaseCommand) GetCommandType() string
GetCommandType returns the command type
func (*BaseCommand) GetMetadata ¶
func (bc *BaseCommand) GetMetadata() map[string]interface{}
GetMetadata returns the command metadata
type BaseEvent ¶ added in v1.0.0
type BaseEvent struct {
	EventType     string                 `json:"event_type"`
	AggregateID   string                 `json:"aggregate_id"`
	AggregateType string                 `json:"aggregate_type"`
	EventData     map[string]interface{} `json:"event_data"`
	Metadata      map[string]interface{} `json:"metadata"`
	Timestamp     time.Time              `json:"timestamp"`
	Version       int                    `json:"version"`
	Data          map[string]interface{} `json:"data"`
	CorrelationID string                 `json:"correlation_id"`
	CausationID   string                 `json:"causation_id"`
	UserID        string                 `json:"user_id"`
	TenantID      string                 `json:"tenant_id"`
}
    BaseEvent provides a basic implementation of Event
func (*BaseEvent) GetAggregateID ¶ added in v1.0.0
GetAggregateID returns the aggregate ID
func (*BaseEvent) GetAggregateType ¶ added in v1.0.0
GetAggregateType returns the aggregate type
func (*BaseEvent) GetEventData ¶ added in v1.0.0
GetEventData returns the event data
func (*BaseEvent) GetEventType ¶ added in v1.0.0
GetEventType returns the event type
func (*BaseEvent) GetMetadata ¶ added in v1.0.0
GetMetadata returns the event metadata
func (*BaseEvent) GetTimestamp ¶ added in v1.0.0
GetTimestamp returns the event timestamp
func (*BaseEvent) GetVersion ¶ added in v1.0.0
GetVersion returns the event version
func (*BaseEvent) WithMetadata ¶ added in v1.0.0
WithMetadata adds metadata to an event
type BaseProjection ¶
type BaseProjection struct {
	Name       string   `json:"name"`
	EventTypes []string `json:"event_types"`
	Position   int64    `json:"position"`
	Logger     *logrus.Logger
	ReadStore  *ReadModelStore
}
    BaseProjection provides a base implementation for projections
func (*BaseProjection) GetEventTypes ¶
func (bp *BaseProjection) GetEventTypes() []string
GetEventTypes returns the event types this projection handles
func (*BaseProjection) GetName ¶
func (bp *BaseProjection) GetName() string
GetName returns the projection name
func (*BaseProjection) GetPosition ¶
func (bp *BaseProjection) GetPosition() int64
GetPosition returns the current projection position
func (*BaseProjection) SetPosition ¶
func (bp *BaseProjection) SetPosition(position int64)
SetPosition sets the projection position
type BaseQuery ¶
type BaseQuery struct {
	QueryType     string                 `json:"query_type"`
	Parameters    map[string]interface{} `json:"parameters"`
	CacheKey      string                 `json:"cache_key"`
	Cacheable     bool                   `json:"cacheable"`
	UserID        string                 `json:"user_id"`
	TenantID      string                 `json:"tenant_id"`
	CorrelationID string                 `json:"correlation_id"`
}
    BaseQuery provides a base implementation for queries
func (*BaseQuery) GetCacheKey ¶
GetCacheKey returns the cache key
func (*BaseQuery) GetParameters ¶
GetParameters returns the query parameters
func (*BaseQuery) GetQueryType ¶
GetQueryType returns the query type
func (*BaseQuery) IsCacheable ¶
IsCacheable returns whether the query is cacheable
type BaseReadModel ¶
type BaseReadModel struct {
	ID          string                 `json:"id"`
	Type        string                 `json:"type"`
	Version     int                    `json:"version"`
	LastUpdated time.Time              `json:"last_updated"`
	TenantID    string                 `json:"tenant_id"`
	Metadata    map[string]interface{} `json:"metadata,omitempty"`
}
    BaseReadModel provides a base implementation for read models
func (*BaseReadModel) GetID ¶
func (brm *BaseReadModel) GetID() string
GetID returns the read model ID
func (*BaseReadModel) GetLastUpdated ¶
func (brm *BaseReadModel) GetLastUpdated() time.Time
GetLastUpdated returns the last updated timestamp
func (*BaseReadModel) GetType ¶
func (brm *BaseReadModel) GetType() string
GetType returns the read model type
func (*BaseReadModel) GetVersion ¶
func (brm *BaseReadModel) GetVersion() int
GetVersion returns the read model version
type BatchCommand ¶
type BatchCommand struct {
	Commands    []Command `json:"commands"`
	Transaction bool      `json:"transaction"` // If true, all commands must succeed
}
    BatchCommand represents a batch of commands to execute
type BatchResult ¶
type BatchResult struct {
	Results    []*CommandResult `json:"results"`
	Success    bool             `json:"success"`
	FailedAt   int              `json:"failed_at,omitempty"` // Index of failed command
	TotalCount int              `json:"total_count"`
}
    BatchResult represents the result of batch command execution
type CacheStats ¶
type CacheStats struct {
	Hits     int64   `json:"hits"`
	Misses   int64   `json:"misses"`
	Entries  int     `json:"entries"`
	HitRatio float64 `json:"hit_ratio"`
}
    CacheStats represents cache statistics
type Command ¶
type Command interface {
	GetCommandType() string
	GetAggregateID() string
	GetMetadata() map[string]interface{}
}
    Command represents a command that can be executed
type CommandBus ¶
type CommandBus struct {
	// contains filtered or unexported fields
}
    CommandBus handles command dispatching and execution
func NewCommandBus ¶
func NewCommandBus(config CommandBusConfig, logger *logrus.Logger) *CommandBus
NewCommandBus creates a new command bus
func (*CommandBus) Execute ¶
func (cb *CommandBus) Execute(ctx context.Context, command Command) (*CommandResult, error)
Execute executes a command
func (*CommandBus) ExecuteAsync ¶
func (cb *CommandBus) ExecuteAsync(ctx context.Context, command Command) <-chan *AsyncCommandResult
ExecuteAsync executes a command asynchronously
func (*CommandBus) ExecuteBatch ¶
func (cb *CommandBus) ExecuteBatch(ctx context.Context, batch *BatchCommand) (*BatchResult, error)
ExecuteBatch executes multiple commands in a batch
func (*CommandBus) GetHandlerCount ¶
func (cb *CommandBus) GetHandlerCount() int
GetHandlerCount returns the number of registered handlers
func (*CommandBus) GetRegisteredCommands ¶
func (cb *CommandBus) GetRegisteredCommands() []string
GetRegisteredCommands returns a list of registered command types
func (*CommandBus) HealthCheck ¶
func (cb *CommandBus) HealthCheck(ctx context.Context) error
HealthCheck performs a health check on the command bus
func (*CommandBus) RegisterHandler ¶
func (cb *CommandBus) RegisterHandler(handler CommandHandler) error
RegisterHandler registers a command handler
func (*CommandBus) UnregisterHandler ¶
func (cb *CommandBus) UnregisterHandler(commandType string) error
UnregisterHandler removes a command handler
func (*CommandBus) ValidateCommand ¶
func (cb *CommandBus) ValidateCommand(command Command) error
ValidateCommand validates a command before execution
type CommandBusConfig ¶
type CommandBusConfig struct {
	EnableMetrics   bool `json:"enable_metrics"`
	EnableTracing   bool `json:"enable_tracing"`
	MaxConcurrent   int  `json:"max_concurrent"`
	TimeoutDuration int  `json:"timeout_duration"` // seconds
}
    CommandBusConfig holds command bus configuration
type CommandHandler ¶
type CommandHandler interface {
	Handle(ctx context.Context, command Command) (*CommandResult, error)
	GetCommandType() string
}
    CommandHandler defines the interface for command handlers
type CommandResult ¶
type CommandResult struct {
	AggregateID   string                 `json:"aggregate_id"`
	EventsEmitted []interface{}          `json:"events_emitted"`
	Success       bool                   `json:"success"`
	Message       string                 `json:"message,omitempty"`
	Metadata      map[string]interface{} `json:"metadata,omitempty"`
	Version       int                    `json:"version"`
}
    CommandResult represents the result of a command execution
type Event ¶ added in v1.0.0
type Event interface {
	GetEventType() string
	GetAggregateID() string
	GetAggregateType() string
	GetEventData() map[string]interface{}
	GetMetadata() map[string]interface{}
	GetTimestamp() time.Time
	GetVersion() int
}
    Event represents a domain event
type EventStore ¶
type EventStore interface {
	GetEventStream(ctx context.Context, aggregateID, aggregateType string, fromVersion int) (*EventStream, error)
	GetEventsByTimeRange(ctx context.Context, start, end time.Time, limit int) ([]*StoredEvent, error)
	GetEventsByType(ctx context.Context, eventType string, limit int, offset int) ([]*StoredEvent, error)
	GetEventCount(ctx context.Context) (int64, error)
}
    EventStore interface for accessing events (matches the event store implementation)
type EventStream ¶ added in v1.0.0
type EventStream struct {
	AggregateID   string         `json:"aggregate_id"`
	AggregateType string         `json:"aggregate_type"`
	Events        []*StoredEvent `json:"events"`
	Version       int            `json:"version"`
	Timestamp     time.Time      `json:"timestamp"`
}
    EventStream represents a stream of events
func (*EventStream) GetEvents ¶ added in v1.0.0
func (es *EventStream) GetEvents() ([]Event, error)
GetEvents returns the events as Event interfaces
type InMemoryQueryCache ¶
type InMemoryQueryCache struct {
	// contains filtered or unexported fields
}
    InMemoryQueryCache provides an in-memory implementation of QueryCache
func NewInMemoryQueryCache ¶
func NewInMemoryQueryCache() *InMemoryQueryCache
NewInMemoryQueryCache creates a new in-memory query cache
func (*InMemoryQueryCache) Clear ¶
func (imc *InMemoryQueryCache) Clear()
Clear removes all cached entries
func (*InMemoryQueryCache) Delete ¶
func (imc *InMemoryQueryCache) Delete(key string)
Delete removes a cached entry
func (*InMemoryQueryCache) Get ¶
func (imc *InMemoryQueryCache) Get(key string) (*QueryResult, bool)
Get retrieves a cached query result
func (*InMemoryQueryCache) GetStats ¶
func (imc *InMemoryQueryCache) GetStats() CacheStats
GetStats returns cache statistics
func (*InMemoryQueryCache) Set ¶
func (imc *InMemoryQueryCache) Set(key string, result *QueryResult, ttl time.Duration)
Set stores a query result in cache
type PaginatedQuery ¶
type PaginatedQuery struct {
	*BaseQuery
	Page     int    `json:"page"`
	PageSize int    `json:"page_size"`
	SortBy   string `json:"sort_by,omitempty"`
	SortDir  string `json:"sort_dir,omitempty"` // asc, desc
}
    PaginatedQuery represents a query with pagination parameters
func NewPaginatedQuery ¶
func NewPaginatedQuery(queryType string, page, pageSize int) *PaginatedQuery
NewPaginatedQuery creates a new paginated query
type PaginatedResult ¶
type PaginatedResult struct {
	*QueryResult
	Page        int  `json:"page"`
	PageSize    int  `json:"page_size"`
	TotalCount  int  `json:"total_count"`
	TotalPages  int  `json:"total_pages"`
	HasNext     bool `json:"has_next"`
	HasPrevious bool `json:"has_previous"`
}
    PaginatedResult represents a paginated query result
type Projection ¶
type Projection interface {
	GetName() string
	GetEventTypes() []string
	Project(ctx context.Context, event *StoredEvent) (*ProjectionResult, error)
	Reset(ctx context.Context) error
	GetPosition() int64
	SetPosition(position int64)
}
    Projection defines the interface for event projections
type ProjectionConfig ¶
type ProjectionConfig struct {
	BatchSize       int           `json:"batch_size"`
	PollInterval    time.Duration `json:"poll_interval"`
	ErrorRetryCount int           `json:"error_retry_count"`
	ErrorRetryDelay time.Duration `json:"error_retry_delay"`
	EnableMetrics   bool          `json:"enable_metrics"`
}
    ProjectionConfig holds projection manager configuration
type ProjectionManager ¶
type ProjectionManager struct {
	// contains filtered or unexported fields
}
    ProjectionManager manages event projections to read models
func NewProjectionManager ¶
func NewProjectionManager(eventStore EventStore, readStore *ReadModelStore, config ProjectionConfig, logger *logrus.Logger) *ProjectionManager
NewProjectionManager creates a new projection manager
func (*ProjectionManager) Close ¶
func (pm *ProjectionManager) Close() error
Close closes the projection manager
func (*ProjectionManager) GetAllProjectionStates ¶
func (pm *ProjectionManager) GetAllProjectionStates() ([]*ProjectionState, error)
GetAllProjectionStates returns the state of all projections
func (*ProjectionManager) GetProjectionCount ¶
func (pm *ProjectionManager) GetProjectionCount() int
GetProjectionCount returns the number of registered projections
func (*ProjectionManager) GetProjectionState ¶
func (pm *ProjectionManager) GetProjectionState(projectionName string) (*ProjectionState, error)
GetProjectionState returns the state of a projection
func (*ProjectionManager) HealthCheck ¶
func (pm *ProjectionManager) HealthCheck(ctx context.Context) error
HealthCheck performs a health check on the projection manager
func (*ProjectionManager) IsRunning ¶
func (pm *ProjectionManager) IsRunning() bool
IsRunning returns whether the projection manager is running
func (*ProjectionManager) ProjectEvents ¶
func (pm *ProjectionManager) ProjectEvents(ctx context.Context, projectionName string, fromPosition int64) error
ProjectEvents manually projects events for a specific projection
func (*ProjectionManager) RebuildProjection ¶
func (pm *ProjectionManager) RebuildProjection(ctx context.Context, projectionName string) error
RebuildProjection rebuilds a projection from scratch
func (*ProjectionManager) RegisterProjection ¶
func (pm *ProjectionManager) RegisterProjection(projection Projection) error
RegisterProjection registers a projection
func (*ProjectionManager) ResetProjection ¶
func (pm *ProjectionManager) ResetProjection(ctx context.Context, projectionName string) error
ResetProjection resets a projection to start from the beginning
func (*ProjectionManager) Start ¶
func (pm *ProjectionManager) Start(ctx context.Context) error
Start starts the projection manager
func (*ProjectionManager) Stop ¶
func (pm *ProjectionManager) Stop() error
Stop stops the projection manager
func (*ProjectionManager) UnregisterProjection ¶
func (pm *ProjectionManager) UnregisterProjection(name string) error
UnregisterProjection removes a projection
type ProjectionResult ¶
type ProjectionResult struct {
	ReadModels []ReadModel `json:"read_models"`
	Success    bool        `json:"success"`
	Message    string      `json:"message,omitempty"`
	Position   int64       `json:"position"`
}
    ProjectionResult represents the result of a projection
type ProjectionState ¶
type ProjectionState struct {
	Name        string    `json:"name"`
	Position    int64     `json:"position"`
	LastUpdated time.Time `json:"last_updated"`
	EventsCount int64     `json:"events_count"`
	ErrorsCount int64     `json:"errors_count"`
	IsRunning   bool      `json:"is_running"`
	LastError   string    `json:"last_error,omitempty"`
}
    ProjectionState represents the state of a projection
type Query ¶
type Query interface {
	GetQueryType() string
	GetParameters() map[string]interface{}
	GetCacheKey() string
	IsCacheable() bool
}
    Query represents a query that can be executed
type QueryBus ¶
type QueryBus struct {
	// contains filtered or unexported fields
}
    QueryBus handles query dispatching and execution
func NewQueryBus ¶
func NewQueryBus(config QueryBusConfig, logger *logrus.Logger) *QueryBus
NewQueryBus creates a new query bus
func (*QueryBus) ExecuteAsync ¶
func (qb *QueryBus) ExecuteAsync(ctx context.Context, query Query) <-chan *AsyncQueryResult
ExecuteAsync executes a query asynchronously
func (*QueryBus) GetCacheStats ¶
func (qb *QueryBus) GetCacheStats() *CacheStats
GetCacheStats returns cache statistics
func (*QueryBus) GetHandlerCount ¶
GetHandlerCount returns the number of registered handlers
func (*QueryBus) GetRegisteredQueries ¶
GetRegisteredQueries returns a list of registered query types
func (*QueryBus) HealthCheck ¶
HealthCheck performs a health check on the query bus
func (*QueryBus) InvalidateCache ¶
InvalidateCache invalidates cached results for a specific pattern
func (*QueryBus) RegisterHandler ¶
func (qb *QueryBus) RegisterHandler(handler QueryHandler) error
RegisterHandler registers a query handler
func (*QueryBus) UnregisterHandler ¶
UnregisterHandler removes a query handler
func (*QueryBus) ValidateQuery ¶
ValidateQuery validates a query before execution
type QueryBusConfig ¶
type QueryBusConfig struct {
	EnableCache     bool          `json:"enable_cache"`
	CacheTTL        time.Duration `json:"cache_ttl"`
	EnableMetrics   bool          `json:"enable_metrics"`
	EnableTracing   bool          `json:"enable_tracing"`
	MaxConcurrent   int           `json:"max_concurrent"`
	TimeoutDuration time.Duration `json:"timeout_duration"`
}
    QueryBusConfig holds query bus configuration
type QueryCache ¶
type QueryCache interface {
	Get(key string) (*QueryResult, bool)
	Set(key string, result *QueryResult, ttl time.Duration)
	Delete(key string)
	Clear()
	GetStats() CacheStats
}
    QueryCache interface for caching query results
type QueryHandler ¶
type QueryHandler interface {
	Handle(ctx context.Context, query Query) (*QueryResult, error)
	GetQueryType() string
}
    QueryHandler defines the interface for query handlers
type QueryResult ¶
type QueryResult struct {
	Data      interface{}            `json:"data"`
	Success   bool                   `json:"success"`
	Message   string                 `json:"message,omitempty"`
	Metadata  map[string]interface{} `json:"metadata,omitempty"`
	Count     int                    `json:"count,omitempty"`
	FromCache bool                   `json:"from_cache"`
	Timestamp time.Time              `json:"timestamp"`
}
    QueryResult represents the result of a query execution
type ReadModel ¶
type ReadModel interface {
	GetID() string
	GetType() string
	GetVersion() int
	GetLastUpdated() time.Time
}
    ReadModel represents a read-optimized data model
type ReadModelConfig ¶
type ReadModelConfig struct {
	TablePrefix      string        `json:"table_prefix"`
	EnableVersioning bool          `json:"enable_versioning"`
	EnableTTL        bool          `json:"enable_ttl"`
	DefaultTTL       time.Duration `json:"default_ttl"`
	BatchSize        int           `json:"batch_size"`
}
    ReadModelConfig holds read model configuration
type ReadModelRecord ¶
type ReadModelRecord struct {
	ID        string     `gorm:"type:uuid;primary_key" json:"id"`
	Type      string     `gorm:"type:varchar(255);not null;index" json:"type"`
	Data      string     `gorm:"type:jsonb;not null" json:"data"`
	Version   int        `gorm:"not null;default:1" json:"version"`
	CreatedAt time.Time  `gorm:"autoCreateTime" json:"created_at"`
	UpdatedAt time.Time  `gorm:"autoUpdateTime" json:"updated_at"`
	ExpiresAt *time.Time `gorm:"index" json:"expires_at,omitempty"`
	TenantID  string     `gorm:"type:uuid;index" json:"tenant_id"`
	Metadata  string     `gorm:"type:jsonb" json:"metadata,omitempty"`
}
    ReadModelRecord represents a stored read model
type ReadModelStore ¶
type ReadModelStore struct {
	// contains filtered or unexported fields
}
    ReadModelStore handles storage and retrieval of read models
func NewReadModelStore ¶
func NewReadModelStore(db *gorm.DB, config ReadModelConfig, logger *logrus.Logger) (*ReadModelStore, error)
NewReadModelStore creates a new read model store
func (*ReadModelStore) CleanupExpired ¶
func (rms *ReadModelStore) CleanupExpired(ctx context.Context) (int64, error)
CleanupExpired removes expired read models
func (*ReadModelStore) Close ¶
func (rms *ReadModelStore) Close() error
Close closes the read model store
func (*ReadModelStore) Delete ¶
func (rms *ReadModelStore) Delete(ctx context.Context, id, modelType string) error
Delete removes a read model
func (*ReadModelStore) DeleteByType ¶
func (rms *ReadModelStore) DeleteByType(ctx context.Context, modelType string) error
DeleteByType removes all read models of a specific type
func (*ReadModelStore) Get ¶
func (rms *ReadModelStore) Get(ctx context.Context, id, modelType string, result interface{}) error
Get retrieves a read model by ID and type
func (*ReadModelStore) GetStats ¶
func (rms *ReadModelStore) GetStats(ctx context.Context) (map[string]interface{}, error)
GetStats returns statistics about stored read models
func (*ReadModelStore) List ¶
func (rms *ReadModelStore) List(ctx context.Context, modelType string, filter map[string]interface{}, limit, offset int) ([]*ReadModelRecord, int64, error)
List retrieves multiple read models by type with optional filtering
func (*ReadModelStore) Save ¶
func (rms *ReadModelStore) Save(ctx context.Context, model ReadModel) error
Save saves a read model to the store
func (*ReadModelStore) SaveWithTTL ¶
func (rms *ReadModelStore) SaveWithTTL(ctx context.Context, model ReadModel, ttl time.Duration) error
SaveWithTTL saves a read model with a custom TTL
type Snapshot ¶ added in v1.0.0
type Snapshot struct {
	ID            string    `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"`
	AggregateID   string    `gorm:"not null;index" json:"aggregate_id"`
	AggregateType string    `gorm:"not null;index" json:"aggregate_type"`
	Version       int       `gorm:"not null" json:"version"`
	Data          string    `gorm:"type:jsonb" json:"data"`
	Timestamp     time.Time `gorm:"not null" json:"timestamp"`
	CreatedAt     time.Time `gorm:"autoCreateTime" json:"created_at"`
}
    Snapshot represents an aggregate snapshot
type StoredEvent ¶ added in v1.0.0
type StoredEvent struct {
	ID            string    `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"`
	EventType     string    `gorm:"not null;index" json:"event_type"`
	AggregateID   string    `gorm:"not null;index" json:"aggregate_id"`
	AggregateType string    `gorm:"not null;index" json:"aggregate_type"`
	EventData     string    `gorm:"type:jsonb" json:"event_data"`
	Metadata      string    `gorm:"type:jsonb" json:"metadata"`
	Version       int       `gorm:"not null;index" json:"version"`
	Timestamp     time.Time `gorm:"not null;index" json:"timestamp"`
	CreatedAt     time.Time `gorm:"autoCreateTime" json:"created_at"`
	CorrelationID string    `gorm:"index" json:"correlation_id"`
	CausationID   string    `gorm:"index" json:"causation_id"`
	UserID        string    `gorm:"index" json:"user_id"`
	TenantID      string    `gorm:"index" json:"tenant_id"`
}
    StoredEvent represents an event as stored in the event store
func FromEvent ¶ added in v1.0.0
func FromEvent(event Event) (*StoredEvent, error)
FromEvent creates a StoredEvent from an Event
func (*StoredEvent) ToEvent ¶ added in v1.0.0
func (se *StoredEvent) ToEvent() (Event, error)
ToEvent converts a StoredEvent to an Event
type UserAggregate ¶
type UserAggregate struct {
	*BaseAggregate
	Email     string     `json:"email"`
	Name      string     `json:"name"`
	IsActive  bool       `json:"is_active"`
	DeletedAt *time.Time `json:"deleted_at,omitempty"`
}
    UserAggregate example aggregate
func NewUserAggregate ¶
func NewUserAggregate(id, email, name string) *UserAggregate
NewUserAggregate creates a new user aggregate
func (*UserAggregate) ApplyEvent ¶
func (ua *UserAggregate) ApplyEvent(event Event) error
ApplyEvent applies events to the user aggregate
func (*UserAggregate) ChangeEmail ¶
func (ua *UserAggregate) ChangeEmail(newEmail string, userID string)
ChangeEmail changes the user's email
func (*UserAggregate) Deactivate ¶
func (ua *UserAggregate) Deactivate(userID string, reason string)
Deactivate deactivates the user