cqrs

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 21, 2025 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CalculatePagination

func CalculatePagination(page, pageSize, totalCount int) (int, bool, bool)

CalculatePagination calculates pagination metadata

Types

type Aggregate

type Aggregate interface {
	GetID() string
	GetType() string
	GetVersion() int
	GetUncommittedEvents() []Event
	MarkEventsAsCommitted()
	LoadFromHistory(events []*StoredEvent) error
	ApplyEvent(event Event) error
}

Aggregate represents a domain aggregate root

type AggregateConfig

type AggregateConfig struct {
	SnapshotFrequency int  `json:"snapshot_frequency"`
	EnableSnapshots   bool `json:"enable_snapshots"`
	MaxEventCount     int  `json:"max_event_count"`
}

AggregateConfig holds aggregate repository configuration

type AggregateFactory

type AggregateFactory struct {
	// contains filtered or unexported fields
}

AggregateFactory creates aggregate instances

func NewAggregateFactory

func NewAggregateFactory(logger *logrus.Logger) *AggregateFactory

NewAggregateFactory creates a new aggregate factory

func (*AggregateFactory) Create

func (af *AggregateFactory) Create(aggregateType string) (Aggregate, error)

Create creates a new aggregate instance

func (*AggregateFactory) CreateNew

func (af *AggregateFactory) CreateNew(aggregateType string) (Aggregate, error)

CreateNew creates a new aggregate with a generated UUID

func (*AggregateFactory) CreateWithID

func (af *AggregateFactory) CreateWithID(aggregateType, id string) (Aggregate, error)

CreateWithID creates a new aggregate instance with a specific ID

func (*AggregateFactory) GetRegisteredTypes

func (af *AggregateFactory) GetRegisteredTypes() []string

GetRegisteredTypes returns all registered aggregate types

func (*AggregateFactory) RegisterType

func (af *AggregateFactory) RegisterType(aggregateType string, aggregateStruct interface{})

RegisterType registers an aggregate type

type AggregateRepository

type AggregateRepository struct {
	// contains filtered or unexported fields
}

AggregateRepository handles aggregate persistence and retrieval

func NewAggregateRepository

func NewAggregateRepository(eventStore EventStore, config AggregateConfig, logger *logrus.Logger) *AggregateRepository

NewAggregateRepository creates a new aggregate repository

func (*AggregateRepository) Delete

func (ar *AggregateRepository) Delete(ctx context.Context, aggregate Aggregate, deletionEvent Event) error

Delete marks an aggregate as deleted (typically by adding a deletion event)

func (*AggregateRepository) Exists

func (ar *AggregateRepository) Exists(ctx context.Context, aggregateID, aggregateType string) (bool, error)

Exists checks if an aggregate exists

func (*AggregateRepository) GetVersion

func (ar *AggregateRepository) GetVersion(ctx context.Context, aggregateID, aggregateType string) (int, error)

GetVersion returns the current version of an aggregate

func (*AggregateRepository) Load

func (ar *AggregateRepository) Load(ctx context.Context, aggregateID, aggregateType string, aggregate Aggregate) error

Load loads an aggregate from the event store

func (*AggregateRepository) Save

func (ar *AggregateRepository) Save(ctx context.Context, aggregate Aggregate) error

Save saves an aggregate by persisting its uncommitted events

type AsyncCommandResult

type AsyncCommandResult struct {
	Result *CommandResult
	Error  error
}

AsyncCommandResult represents the result of an async command execution

type AsyncQueryResult

type AsyncQueryResult struct {
	Result *QueryResult
	Error  error
}

AsyncQueryResult represents the result of an async query execution

type BaseAggregate

type BaseAggregate struct {
	ID                string    `json:"id"`
	Type              string    `json:"type"`
	Version           int       `json:"version"`
	UncommittedEvents []Event   `json:"-"`
	CreatedAt         time.Time `json:"created_at"`
	UpdatedAt         time.Time `json:"updated_at"`
	TenantID          string    `json:"tenant_id"`
}

BaseAggregate provides a base implementation for aggregates

func (*BaseAggregate) AddEvent

func (ba *BaseAggregate) AddEvent(event Event)

AddEvent adds an event to the uncommitted events list

func (*BaseAggregate) ApplyEvent

func (ba *BaseAggregate) ApplyEvent(event Event) error

ApplyEvent applies an event to the aggregate (to be overridden by concrete aggregates)

func (*BaseAggregate) GetID

func (ba *BaseAggregate) GetID() string

GetID returns the aggregate ID

func (*BaseAggregate) GetType

func (ba *BaseAggregate) GetType() string

GetType returns the aggregate type

func (*BaseAggregate) GetUncommittedEvents

func (ba *BaseAggregate) GetUncommittedEvents() []Event

GetUncommittedEvents returns uncommitted events

func (*BaseAggregate) GetVersion

func (ba *BaseAggregate) GetVersion() int

GetVersion returns the aggregate version

func (*BaseAggregate) LoadFromHistory

func (ba *BaseAggregate) LoadFromHistory(events []*StoredEvent) error

LoadFromHistory loads the aggregate from historical events

func (*BaseAggregate) MarkEventsAsCommitted

func (ba *BaseAggregate) MarkEventsAsCommitted()

MarkEventsAsCommitted marks all uncommitted events as committed

type BaseCommand

type BaseCommand struct {
	CommandType   string                 `json:"command_type"`
	AggregateID   string                 `json:"aggregate_id"`
	Metadata      map[string]interface{} `json:"metadata"`
	UserID        string                 `json:"user_id"`
	TenantID      string                 `json:"tenant_id"`
	CorrelationID string                 `json:"correlation_id"`
}

BaseCommand provides a base implementation for commands

func (*BaseCommand) GetAggregateID

func (bc *BaseCommand) GetAggregateID() string

GetAggregateID returns the aggregate ID

func (*BaseCommand) GetCommandType

func (bc *BaseCommand) GetCommandType() string

GetCommandType returns the command type

func (*BaseCommand) GetMetadata

func (bc *BaseCommand) GetMetadata() map[string]interface{}

GetMetadata returns the command metadata

type BaseEvent added in v1.0.0

type BaseEvent struct {
	EventType     string                 `json:"event_type"`
	AggregateID   string                 `json:"aggregate_id"`
	AggregateType string                 `json:"aggregate_type"`
	EventData     map[string]interface{} `json:"event_data"`
	Metadata      map[string]interface{} `json:"metadata"`
	Timestamp     time.Time              `json:"timestamp"`
	Version       int                    `json:"version"`
	Data          map[string]interface{} `json:"data"`
	CorrelationID string                 `json:"correlation_id"`
	CausationID   string                 `json:"causation_id"`
	UserID        string                 `json:"user_id"`
	TenantID      string                 `json:"tenant_id"`
}

BaseEvent provides a basic implementation of Event

func (*BaseEvent) GetAggregateID added in v1.0.0

func (e *BaseEvent) GetAggregateID() string

GetAggregateID returns the aggregate ID

func (*BaseEvent) GetAggregateType added in v1.0.0

func (e *BaseEvent) GetAggregateType() string

GetAggregateType returns the aggregate type

func (*BaseEvent) GetEventData added in v1.0.0

func (e *BaseEvent) GetEventData() map[string]interface{}

GetEventData returns the event data

func (*BaseEvent) GetEventType added in v1.0.0

func (e *BaseEvent) GetEventType() string

GetEventType returns the event type

func (*BaseEvent) GetMetadata added in v1.0.0

func (e *BaseEvent) GetMetadata() map[string]interface{}

GetMetadata returns the event metadata

func (*BaseEvent) GetTimestamp added in v1.0.0

func (e *BaseEvent) GetTimestamp() time.Time

GetTimestamp returns the event timestamp

func (*BaseEvent) GetVersion added in v1.0.0

func (e *BaseEvent) GetVersion() int

GetVersion returns the event version

func (*BaseEvent) WithMetadata added in v1.0.0

func (e *BaseEvent) WithMetadata(key string, value interface{}) *BaseEvent

WithMetadata adds metadata to an event

type BaseProjection

type BaseProjection struct {
	Name       string   `json:"name"`
	EventTypes []string `json:"event_types"`
	Position   int64    `json:"position"`
	Logger     *logrus.Logger
	ReadStore  *ReadModelStore
}

BaseProjection provides a base implementation for projections

func (*BaseProjection) GetEventTypes

func (bp *BaseProjection) GetEventTypes() []string

GetEventTypes returns the event types this projection handles

func (*BaseProjection) GetName

func (bp *BaseProjection) GetName() string

GetName returns the projection name

func (*BaseProjection) GetPosition

func (bp *BaseProjection) GetPosition() int64

GetPosition returns the current projection position

func (*BaseProjection) SetPosition

func (bp *BaseProjection) SetPosition(position int64)

SetPosition sets the projection position

type BaseQuery

type BaseQuery struct {
	QueryType     string                 `json:"query_type"`
	Parameters    map[string]interface{} `json:"parameters"`
	CacheKey      string                 `json:"cache_key"`
	Cacheable     bool                   `json:"cacheable"`
	UserID        string                 `json:"user_id"`
	TenantID      string                 `json:"tenant_id"`
	CorrelationID string                 `json:"correlation_id"`
}

BaseQuery provides a base implementation for queries

func (*BaseQuery) GetCacheKey

func (bq *BaseQuery) GetCacheKey() string

GetCacheKey returns the cache key

func (*BaseQuery) GetParameters

func (bq *BaseQuery) GetParameters() map[string]interface{}

GetParameters returns the query parameters

func (*BaseQuery) GetQueryType

func (bq *BaseQuery) GetQueryType() string

GetQueryType returns the query type

func (*BaseQuery) IsCacheable

func (bq *BaseQuery) IsCacheable() bool

IsCacheable returns whether the query is cacheable

type BaseReadModel

type BaseReadModel struct {
	ID          string                 `json:"id"`
	Type        string                 `json:"type"`
	Version     int                    `json:"version"`
	LastUpdated time.Time              `json:"last_updated"`
	TenantID    string                 `json:"tenant_id"`
	Metadata    map[string]interface{} `json:"metadata,omitempty"`
}

BaseReadModel provides a base implementation for read models

func (*BaseReadModel) GetID

func (brm *BaseReadModel) GetID() string

GetID returns the read model ID

func (*BaseReadModel) GetLastUpdated

func (brm *BaseReadModel) GetLastUpdated() time.Time

GetLastUpdated returns the last updated timestamp

func (*BaseReadModel) GetType

func (brm *BaseReadModel) GetType() string

GetType returns the read model type

func (*BaseReadModel) GetVersion

func (brm *BaseReadModel) GetVersion() int

GetVersion returns the read model version

type BatchCommand

type BatchCommand struct {
	Commands    []Command `json:"commands"`
	Transaction bool      `json:"transaction"` // If true, all commands must succeed
}

BatchCommand represents a batch of commands to execute

type BatchResult

type BatchResult struct {
	Results    []*CommandResult `json:"results"`
	Success    bool             `json:"success"`
	FailedAt   int              `json:"failed_at,omitempty"` // Index of failed command
	TotalCount int              `json:"total_count"`
}

BatchResult represents the result of batch command execution

type CacheStats

type CacheStats struct {
	Hits     int64   `json:"hits"`
	Misses   int64   `json:"misses"`
	Entries  int     `json:"entries"`
	HitRatio float64 `json:"hit_ratio"`
}

CacheStats represents cache statistics

type Command

type Command interface {
	GetCommandType() string
	GetAggregateID() string
	GetMetadata() map[string]interface{}
}

Command represents a command that can be executed

type CommandBus

type CommandBus struct {
	// contains filtered or unexported fields
}

CommandBus handles command dispatching and execution

func NewCommandBus

func NewCommandBus(config CommandBusConfig, logger *logrus.Logger) *CommandBus

NewCommandBus creates a new command bus

func (*CommandBus) Close

func (cb *CommandBus) Close() error

Close closes the command bus

func (*CommandBus) Execute

func (cb *CommandBus) Execute(ctx context.Context, command Command) (*CommandResult, error)

Execute executes a command

func (*CommandBus) ExecuteAsync

func (cb *CommandBus) ExecuteAsync(ctx context.Context, command Command) <-chan *AsyncCommandResult

ExecuteAsync executes a command asynchronously

func (*CommandBus) ExecuteBatch

func (cb *CommandBus) ExecuteBatch(ctx context.Context, batch *BatchCommand) (*BatchResult, error)

ExecuteBatch executes multiple commands in a batch

func (*CommandBus) GetHandlerCount

func (cb *CommandBus) GetHandlerCount() int

GetHandlerCount returns the number of registered handlers

func (*CommandBus) GetRegisteredCommands

func (cb *CommandBus) GetRegisteredCommands() []string

GetRegisteredCommands returns a list of registered command types

func (*CommandBus) HealthCheck

func (cb *CommandBus) HealthCheck(ctx context.Context) error

HealthCheck performs a health check on the command bus

func (*CommandBus) RegisterHandler

func (cb *CommandBus) RegisterHandler(handler CommandHandler) error

RegisterHandler registers a command handler

func (*CommandBus) UnregisterHandler

func (cb *CommandBus) UnregisterHandler(commandType string) error

UnregisterHandler removes a command handler

func (*CommandBus) ValidateCommand

func (cb *CommandBus) ValidateCommand(command Command) error

ValidateCommand validates a command before execution

type CommandBusConfig

type CommandBusConfig struct {
	EnableMetrics   bool `json:"enable_metrics"`
	EnableTracing   bool `json:"enable_tracing"`
	MaxConcurrent   int  `json:"max_concurrent"`
	TimeoutDuration int  `json:"timeout_duration"` // seconds
}

CommandBusConfig holds command bus configuration

type CommandHandler

type CommandHandler interface {
	Handle(ctx context.Context, command Command) (*CommandResult, error)
	GetCommandType() string
}

CommandHandler defines the interface for command handlers

type CommandResult

type CommandResult struct {
	AggregateID   string                 `json:"aggregate_id"`
	EventsEmitted []interface{}          `json:"events_emitted"`
	Success       bool                   `json:"success"`
	Message       string                 `json:"message,omitempty"`
	Metadata      map[string]interface{} `json:"metadata,omitempty"`
	Version       int                    `json:"version"`
}

CommandResult represents the result of a command execution

type Event added in v1.0.0

type Event interface {
	GetEventType() string
	GetAggregateID() string
	GetAggregateType() string
	GetEventData() map[string]interface{}
	GetMetadata() map[string]interface{}
	GetTimestamp() time.Time
	GetVersion() int
}

Event represents a domain event

func NewEvent added in v1.0.0

func NewEvent(eventType, aggregateID, aggregateType string, data map[string]interface{}, version int) Event

NewEvent creates a new BaseEvent

type EventStore

type EventStore interface {
	// Event operations
	AppendEvents(ctx context.Context, aggregateID, aggregateType string, expectedVersion int, events []Event) error
	GetEventStream(ctx context.Context, aggregateID, aggregateType string, fromVersion int) (*EventStream, error)
	GetEventsByTimeRange(ctx context.Context, start, end time.Time, limit int) ([]*StoredEvent, error)
	GetEventsByType(ctx context.Context, eventType string, limit int, offset int) ([]*StoredEvent, error)
	GetEventCount(ctx context.Context) (int64, error)

	// Aggregate operations
	GetAggregateVersion(ctx context.Context, aggregateID, aggregateType string) (int, error)

	// Snapshot operations
	GetLatestSnapshot(ctx context.Context, aggregateID, aggregateType string) (*Snapshot, error)
	CreateSnapshot(ctx context.Context, aggregateID, aggregateType string, version int, data interface{}) error
}

EventStore interface defines the complete contract for event storage

type EventStream added in v1.0.0

type EventStream struct {
	AggregateID   string         `json:"aggregate_id"`
	AggregateType string         `json:"aggregate_type"`
	Events        []*StoredEvent `json:"events"`
	Version       int            `json:"version"`
	FromVersion   int            `json:"from_version"`
	ToVersion     int            `json:"to_version"`
	Timestamp     time.Time      `json:"timestamp"`
	FromSnapshot  bool           `json:"from_snapshot"`
}

EventStream represents a stream of events

func (*EventStream) GetEvents added in v1.0.0

func (es *EventStream) GetEvents() ([]Event, error)

GetEvents returns the events as Event interfaces

type InMemoryQueryCache

type InMemoryQueryCache struct {
	// contains filtered or unexported fields
}

InMemoryQueryCache provides an in-memory implementation of QueryCache

func NewInMemoryQueryCache

func NewInMemoryQueryCache() *InMemoryQueryCache

NewInMemoryQueryCache creates a new in-memory query cache

func (*InMemoryQueryCache) Clear

func (imc *InMemoryQueryCache) Clear()

Clear removes all cached entries

func (*InMemoryQueryCache) Delete

func (imc *InMemoryQueryCache) Delete(key string)

Delete removes a cached entry

func (*InMemoryQueryCache) Get

func (imc *InMemoryQueryCache) Get(key string) (*QueryResult, bool)

Get retrieves a cached query result

func (*InMemoryQueryCache) GetStats

func (imc *InMemoryQueryCache) GetStats() CacheStats

GetStats returns cache statistics

func (*InMemoryQueryCache) Set

func (imc *InMemoryQueryCache) Set(key string, result *QueryResult, ttl time.Duration)

Set stores a query result in cache

type PaginatedQuery

type PaginatedQuery struct {
	*BaseQuery
	Page     int    `json:"page"`
	PageSize int    `json:"page_size"`
	SortBy   string `json:"sort_by,omitempty"`
	SortDir  string `json:"sort_dir,omitempty"` // asc, desc
}

PaginatedQuery represents a query with pagination parameters

func NewPaginatedQuery

func NewPaginatedQuery(queryType string, page, pageSize int) *PaginatedQuery

NewPaginatedQuery creates a new paginated query

type PaginatedResult

type PaginatedResult struct {
	*QueryResult
	Page        int  `json:"page"`
	PageSize    int  `json:"page_size"`
	TotalCount  int  `json:"total_count"`
	TotalPages  int  `json:"total_pages"`
	HasNext     bool `json:"has_next"`
	HasPrevious bool `json:"has_previous"`
}

PaginatedResult represents a paginated query result

type Projection

type Projection interface {
	GetName() string
	GetEventTypes() []string
	Project(ctx context.Context, event *StoredEvent) (*ProjectionResult, error)
	Reset(ctx context.Context) error
	GetPosition() int64
	SetPosition(position int64)
}

Projection defines the interface for event projections

type ProjectionConfig

type ProjectionConfig struct {
	BatchSize       int           `json:"batch_size"`
	PollInterval    time.Duration `json:"poll_interval"`
	ErrorRetryCount int           `json:"error_retry_count"`
	ErrorRetryDelay time.Duration `json:"error_retry_delay"`
	EnableMetrics   bool          `json:"enable_metrics"`
}

ProjectionConfig holds projection manager configuration

type ProjectionManager

type ProjectionManager struct {
	// contains filtered or unexported fields
}

ProjectionManager manages event projections to read models

func NewProjectionManager

func NewProjectionManager(eventStore EventStore, readStore *ReadModelStore, config ProjectionConfig, logger *logrus.Logger) *ProjectionManager

NewProjectionManager creates a new projection manager

func (*ProjectionManager) Close

func (pm *ProjectionManager) Close() error

Close closes the projection manager

func (*ProjectionManager) GetAllProjectionStates

func (pm *ProjectionManager) GetAllProjectionStates() ([]*ProjectionState, error)

GetAllProjectionStates returns the state of all projections

func (*ProjectionManager) GetProjectionCount

func (pm *ProjectionManager) GetProjectionCount() int

GetProjectionCount returns the number of registered projections

func (*ProjectionManager) GetProjectionState

func (pm *ProjectionManager) GetProjectionState(projectionName string) (*ProjectionState, error)

GetProjectionState returns the state of a projection

func (*ProjectionManager) HealthCheck

func (pm *ProjectionManager) HealthCheck(ctx context.Context) error

HealthCheck performs a health check on the projection manager

func (*ProjectionManager) IsRunning

func (pm *ProjectionManager) IsRunning() bool

IsRunning returns whether the projection manager is running

func (*ProjectionManager) ProjectEvents

func (pm *ProjectionManager) ProjectEvents(ctx context.Context, projectionName string, fromPosition int64) error

ProjectEvents manually projects events for a specific projection

func (*ProjectionManager) RebuildProjection

func (pm *ProjectionManager) RebuildProjection(ctx context.Context, projectionName string) error

RebuildProjection rebuilds a projection from scratch

func (*ProjectionManager) RegisterProjection

func (pm *ProjectionManager) RegisterProjection(projection Projection) error

RegisterProjection registers a projection

func (*ProjectionManager) ResetProjection

func (pm *ProjectionManager) ResetProjection(ctx context.Context, projectionName string) error

ResetProjection resets a projection to start from the beginning

func (*ProjectionManager) Start

func (pm *ProjectionManager) Start(ctx context.Context) error

Start starts the projection manager

func (*ProjectionManager) Stop

func (pm *ProjectionManager) Stop() error

Stop stops the projection manager

func (*ProjectionManager) UnregisterProjection

func (pm *ProjectionManager) UnregisterProjection(name string) error

UnregisterProjection removes a projection

type ProjectionResult

type ProjectionResult struct {
	ReadModels []ReadModel `json:"read_models"`
	Success    bool        `json:"success"`
	Message    string      `json:"message,omitempty"`
	Position   int64       `json:"position"`
}

ProjectionResult represents the result of a projection

type ProjectionState

type ProjectionState struct {
	Name        string    `json:"name"`
	Position    int64     `json:"position"`
	LastUpdated time.Time `json:"last_updated"`
	EventsCount int64     `json:"events_count"`
	ErrorsCount int64     `json:"errors_count"`
	IsRunning   bool      `json:"is_running"`
	LastError   string    `json:"last_error,omitempty"`
}

ProjectionState represents the state of a projection

type Query

type Query interface {
	GetQueryType() string
	GetParameters() map[string]interface{}
	GetCacheKey() string
	IsCacheable() bool
}

Query represents a query that can be executed

type QueryBus

type QueryBus struct {
	// contains filtered or unexported fields
}

QueryBus handles query dispatching and execution

func NewQueryBus

func NewQueryBus(config QueryBusConfig, logger *logrus.Logger) *QueryBus

NewQueryBus creates a new query bus

func (*QueryBus) Close

func (qb *QueryBus) Close() error

Close closes the query bus

func (*QueryBus) Execute

func (qb *QueryBus) Execute(ctx context.Context, query Query) (*QueryResult, error)

Execute executes a query

func (*QueryBus) ExecuteAsync

func (qb *QueryBus) ExecuteAsync(ctx context.Context, query Query) <-chan *AsyncQueryResult

ExecuteAsync executes a query asynchronously

func (*QueryBus) GetCacheStats

func (qb *QueryBus) GetCacheStats() *CacheStats

GetCacheStats returns cache statistics

func (*QueryBus) GetHandlerCount

func (qb *QueryBus) GetHandlerCount() int

GetHandlerCount returns the number of registered handlers

func (*QueryBus) GetRegisteredQueries

func (qb *QueryBus) GetRegisteredQueries() []string

GetRegisteredQueries returns a list of registered query types

func (*QueryBus) HealthCheck

func (qb *QueryBus) HealthCheck(ctx context.Context) error

HealthCheck performs a health check on the query bus

func (*QueryBus) InvalidateCache

func (qb *QueryBus) InvalidateCache(pattern string)

InvalidateCache invalidates cached results for a specific pattern

func (*QueryBus) RegisterHandler

func (qb *QueryBus) RegisterHandler(handler QueryHandler) error

RegisterHandler registers a query handler

func (*QueryBus) UnregisterHandler

func (qb *QueryBus) UnregisterHandler(queryType string) error

UnregisterHandler removes a query handler

func (*QueryBus) ValidateQuery

func (qb *QueryBus) ValidateQuery(query Query) error

ValidateQuery validates a query before execution

type QueryBusConfig

type QueryBusConfig struct {
	EnableCache     bool          `json:"enable_cache"`
	CacheTTL        time.Duration `json:"cache_ttl"`
	EnableMetrics   bool          `json:"enable_metrics"`
	EnableTracing   bool          `json:"enable_tracing"`
	MaxConcurrent   int           `json:"max_concurrent"`
	TimeoutDuration time.Duration `json:"timeout_duration"`
}

QueryBusConfig holds query bus configuration

type QueryCache

type QueryCache interface {
	Get(key string) (*QueryResult, bool)
	Set(key string, result *QueryResult, ttl time.Duration)
	Delete(key string)
	Clear()
	GetStats() CacheStats
}

QueryCache interface for caching query results

type QueryHandler

type QueryHandler interface {
	Handle(ctx context.Context, query Query) (*QueryResult, error)
	GetQueryType() string
}

QueryHandler defines the interface for query handlers

type QueryResult

type QueryResult struct {
	Data      interface{}            `json:"data"`
	Success   bool                   `json:"success"`
	Message   string                 `json:"message,omitempty"`
	Metadata  map[string]interface{} `json:"metadata,omitempty"`
	Count     int                    `json:"count,omitempty"`
	FromCache bool                   `json:"from_cache"`
	Timestamp time.Time              `json:"timestamp"`
}

QueryResult represents the result of a query execution

type ReadModel

type ReadModel interface {
	GetID() string
	GetType() string
	GetVersion() int
	GetLastUpdated() time.Time
}

ReadModel represents a read-optimized data model

type ReadModelConfig

type ReadModelConfig struct {
	TablePrefix      string        `json:"table_prefix"`
	EnableVersioning bool          `json:"enable_versioning"`
	EnableTTL        bool          `json:"enable_ttl"`
	DefaultTTL       time.Duration `json:"default_ttl"`
	BatchSize        int           `json:"batch_size"`
}

ReadModelConfig holds read model configuration

type ReadModelRecord

type ReadModelRecord struct {
	ID        string     `gorm:"type:uuid;primary_key" json:"id"`
	Type      string     `gorm:"type:varchar(255);not null;index" json:"type"`
	Data      string     `gorm:"type:jsonb;not null" json:"data"`
	Version   int        `gorm:"not null;default:1" json:"version"`
	CreatedAt time.Time  `gorm:"autoCreateTime" json:"created_at"`
	UpdatedAt time.Time  `gorm:"autoUpdateTime" json:"updated_at"`
	ExpiresAt *time.Time `gorm:"index" json:"expires_at,omitempty"`
	TenantID  string     `gorm:"type:uuid;index" json:"tenant_id"`
	Metadata  string     `gorm:"type:jsonb" json:"metadata,omitempty"`
}

ReadModelRecord represents a stored read model

type ReadModelStore

type ReadModelStore struct {
	// contains filtered or unexported fields
}

ReadModelStore handles storage and retrieval of read models

func NewReadModelStore

func NewReadModelStore(db *gorm.DB, config ReadModelConfig, logger *logrus.Logger) (*ReadModelStore, error)

NewReadModelStore creates a new read model store

func (*ReadModelStore) CleanupExpired

func (rms *ReadModelStore) CleanupExpired(ctx context.Context) (int64, error)

CleanupExpired removes expired read models

func (*ReadModelStore) Close

func (rms *ReadModelStore) Close() error

Close closes the read model store

func (*ReadModelStore) Delete

func (rms *ReadModelStore) Delete(ctx context.Context, id, modelType string) error

Delete removes a read model

func (*ReadModelStore) DeleteByType

func (rms *ReadModelStore) DeleteByType(ctx context.Context, modelType string) error

DeleteByType removes all read models of a specific type

func (*ReadModelStore) Exists

func (rms *ReadModelStore) Exists(ctx context.Context, id, modelType string) (bool, error)

Exists checks if a read model exists

func (*ReadModelStore) Get

func (rms *ReadModelStore) Get(ctx context.Context, id, modelType string, result interface{}) error

Get retrieves a read model by ID and type

func (*ReadModelStore) GetStats

func (rms *ReadModelStore) GetStats(ctx context.Context) (map[string]interface{}, error)

GetStats returns statistics about stored read models

func (*ReadModelStore) List

func (rms *ReadModelStore) List(ctx context.Context, modelType string, filter map[string]interface{}, limit, offset int) ([]*ReadModelRecord, int64, error)

List retrieves multiple read models by type with optional filtering

func (*ReadModelStore) Save

func (rms *ReadModelStore) Save(ctx context.Context, model ReadModel) error

Save saves a read model to the store

func (*ReadModelStore) SaveWithTTL

func (rms *ReadModelStore) SaveWithTTL(ctx context.Context, model ReadModel, ttl time.Duration) error

SaveWithTTL saves a read model with a custom TTL

type Snapshot added in v1.0.0

type Snapshot struct {
	ID            string    `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"`
	AggregateID   string    `gorm:"not null;index" json:"aggregate_id"`
	AggregateType string    `gorm:"not null;index" json:"aggregate_type"`
	Version       int       `gorm:"not null" json:"version"`
	Data          string    `gorm:"type:jsonb" json:"data"`
	Timestamp     time.Time `gorm:"not null" json:"timestamp"`
	CreatedAt     time.Time `gorm:"autoCreateTime" json:"created_at"`
}

Snapshot represents an aggregate snapshot

type StoredEvent added in v1.0.0

type StoredEvent struct {
	ID            string    `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"`
	EventType     string    `gorm:"not null;index" json:"event_type"`
	AggregateID   string    `gorm:"not null;index" json:"aggregate_id"`
	AggregateType string    `gorm:"not null;index" json:"aggregate_type"`
	EventData     string    `gorm:"type:jsonb" json:"event_data"`
	Metadata      string    `gorm:"type:jsonb" json:"metadata"`
	Version       int       `gorm:"not null;index" json:"version"`
	Timestamp     time.Time `gorm:"not null;index" json:"timestamp"`
	CreatedAt     time.Time `gorm:"autoCreateTime" json:"created_at"`
	CorrelationID string    `gorm:"index" json:"correlation_id"`
	CausationID   string    `gorm:"index" json:"causation_id"`
	UserID        string    `gorm:"index" json:"user_id"`
	TenantID      string    `gorm:"index" json:"tenant_id"`
}

StoredEvent represents an event as stored in the event store

func FromEvent added in v1.0.0

func FromEvent(event Event) (*StoredEvent, error)

FromEvent creates a StoredEvent from an Event

func (*StoredEvent) ToEvent added in v1.0.0

func (se *StoredEvent) ToEvent() (Event, error)

ToEvent converts a StoredEvent to an Event

type UserAggregate

type UserAggregate struct {
	*BaseAggregate
	Email     string     `json:"email"`
	Name      string     `json:"name"`
	IsActive  bool       `json:"is_active"`
	DeletedAt *time.Time `json:"deleted_at,omitempty"`
}

UserAggregate example aggregate

func NewUserAggregate

func NewUserAggregate(id, email, name string) *UserAggregate

NewUserAggregate creates a new user aggregate

func (*UserAggregate) ApplyEvent

func (ua *UserAggregate) ApplyEvent(event Event) error

ApplyEvent applies events to the user aggregate

func (*UserAggregate) ChangeEmail

func (ua *UserAggregate) ChangeEmail(newEmail string, userID string)

ChangeEmail changes the user's email

func (*UserAggregate) Deactivate

func (ua *UserAggregate) Deactivate(userID string, reason string)

Deactivate deactivates the user

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL