Documentation
¶
Index ¶
- type BaseEvent
- type Event
- type EventHandler
- type EventMessage
- type EventStore
- func (es *EventStore) AppendEvents(ctx context.Context, aggregateID, aggregateType string, expectedVersion int, ...) error
- func (es *EventStore) Close() error
- func (es *EventStore) CreateSnapshot(ctx context.Context, aggregateID, aggregateType string, version int, ...) error
- func (es *EventStore) GetAggregateVersion(ctx context.Context, aggregateID, aggregateType string) (int, error)
- func (es *EventStore) GetEventCount(ctx context.Context) (int64, error)
- func (es *EventStore) GetEventStream(ctx context.Context, aggregateID, aggregateType string, fromVersion int) (*EventStream, error)
- func (es *EventStore) GetEventsByCorrelationID(ctx context.Context, correlationID string) ([]*StoredEvent, error)
- func (es *EventStore) GetEventsByTimeRange(ctx context.Context, start, end time.Time, limit int) ([]*StoredEvent, error)
- func (es *EventStore) GetEventsByType(ctx context.Context, eventType string, limit int, offset int) ([]*StoredEvent, error)
- func (es *EventStore) GetLatestSnapshot(ctx context.Context, aggregateID, aggregateType string) (*Snapshot, error)
- func (es *EventStore) ReplayEvents(ctx context.Context, aggregateID, aggregateType string, ...) error
- type EventStoreConfig
- type EventStream
- type EventSubscription
- type NATSClient
- func (nc *NATSClient) Close() error
- func (nc *NATSClient) CreateStream(config StreamConfig) error
- func (nc *NATSClient) GetStats() *nats.Statistics
- func (nc *NATSClient) GetStreamInfo(streamName string) (*StreamInfo, error)
- func (nc *NATSClient) HealthCheck(ctx context.Context) error
- func (nc *NATSClient) IsConnected() bool
- func (nc *NATSClient) ListStreams() ([]*StreamInfo, error)
- func (nc *NATSClient) PublishEvent(ctx context.Context, event *EventMessage) error
- func (nc *NATSClient) QueueSubscribe(handler EventHandler, queueGroup string) (*EventSubscription, error)
- func (nc *NATSClient) Subscribe(handler EventHandler) (*EventSubscription, error)
- func (nc *NATSClient) Unsubscribe(subscription *EventSubscription) error
- type NATSConfig
- type Snapshot
- type StoredEvent
- type StreamConfig
- type StreamInfo
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BaseEvent ¶
type BaseEvent struct {
AggregateID string `json:"aggregate_id"`
EventType string `json:"event_type"`
Data map[string]interface{} `json:"data"`
Metadata map[string]interface{} `json:"metadata"`
Version int `json:"version"`
Timestamp time.Time `json:"timestamp"`
CorrelationID string `json:"correlation_id"`
CausationID string `json:"causation_id"`
UserID string `json:"user_id"`
TenantID string `json:"tenant_id"`
}
BaseEvent provides a base implementation for events
func (*BaseEvent) GetAggregateID ¶
GetAggregateID returns the aggregate ID
func (*BaseEvent) GetEventData ¶
GetEventData returns the event data
func (*BaseEvent) GetEventType ¶
GetEventType returns the event type
func (*BaseEvent) GetMetadata ¶
GetMetadata returns the event metadata
func (*BaseEvent) GetTimestamp ¶
GetTimestamp returns the event timestamp
func (*BaseEvent) GetVersion ¶
GetVersion returns the event version
type Event ¶
type Event interface {
GetEventType() string
GetAggregateID() string
GetEventData() map[string]interface{}
GetMetadata() map[string]interface{}
GetVersion() int
GetTimestamp() time.Time
}
Event represents a domain event
type EventHandler ¶
type EventHandler interface {
Handle(ctx context.Context, event *EventMessage) error
GetSubject() string
GetName() string
}
EventHandler defines the interface for event handlers
type EventMessage ¶
type EventMessage struct {
ID string `json:"id"`
Type string `json:"type"`
Source string `json:"source"`
Subject string `json:"subject"`
Data map[string]interface{} `json:"data"`
Timestamp time.Time `json:"timestamp"`
Version string `json:"version"`
CorrelationID string `json:"correlation_id,omitempty"`
Headers map[string]string `json:"headers,omitempty"`
}
EventMessage represents an event message
type EventStore ¶
type EventStore struct {
// contains filtered or unexported fields
}
EventStore provides event sourcing capabilities
func NewEventStore ¶
func NewEventStore(db *gorm.DB, config EventStoreConfig, logger *logrus.Logger) (*EventStore, error)
NewEventStore creates a new event store
func (*EventStore) AppendEvents ¶
func (es *EventStore) AppendEvents(ctx context.Context, aggregateID, aggregateType string, expectedVersion int, events []Event) error
AppendEvents appends events to an aggregate stream
func (*EventStore) CreateSnapshot ¶
func (es *EventStore) CreateSnapshot(ctx context.Context, aggregateID, aggregateType string, version int, data interface{}) error
CreateSnapshot creates a snapshot of an aggregate
func (*EventStore) GetAggregateVersion ¶
func (es *EventStore) GetAggregateVersion(ctx context.Context, aggregateID, aggregateType string) (int, error)
GetAggregateVersion returns the current version of an aggregate
func (*EventStore) GetEventCount ¶
func (es *EventStore) GetEventCount(ctx context.Context) (int64, error)
GetEventCount returns the total number of events
func (*EventStore) GetEventStream ¶
func (es *EventStore) GetEventStream(ctx context.Context, aggregateID, aggregateType string, fromVersion int) (*EventStream, error)
GetEventStream retrieves an event stream for an aggregate
func (*EventStore) GetEventsByCorrelationID ¶
func (es *EventStore) GetEventsByCorrelationID(ctx context.Context, correlationID string) ([]*StoredEvent, error)
GetEventsByCorrelationID retrieves events by correlation ID
func (*EventStore) GetEventsByTimeRange ¶
func (es *EventStore) GetEventsByTimeRange(ctx context.Context, start, end time.Time, limit int) ([]*StoredEvent, error)
GetEventsByTimeRange retrieves events within a time range
func (*EventStore) GetEventsByType ¶
func (es *EventStore) GetEventsByType(ctx context.Context, eventType string, limit int, offset int) ([]*StoredEvent, error)
GetEventsByType retrieves events by event type
func (*EventStore) GetLatestSnapshot ¶
func (es *EventStore) GetLatestSnapshot(ctx context.Context, aggregateID, aggregateType string) (*Snapshot, error)
GetLatestSnapshot retrieves the latest snapshot for an aggregate
func (*EventStore) ReplayEvents ¶
func (es *EventStore) ReplayEvents(ctx context.Context, aggregateID, aggregateType string, eventHandler func(*StoredEvent) error) error
ReplayEvents replays events to rebuild aggregate state
type EventStoreConfig ¶
type EventStoreConfig struct {
TableName string `json:"table_name"`
SnapshotTable string `json:"snapshot_table"`
SnapshotInterval int `json:"snapshot_interval"`
RetentionPeriod time.Duration `json:"retention_period"`
EnableSnapshot bool `json:"enable_snapshot"`
}
EventStoreConfig holds event store configuration
type EventStream ¶
type EventStream struct {
AggregateID string `json:"aggregate_id"`
AggregateType string `json:"aggregate_type"`
Version int `json:"version"`
Events []*StoredEvent `json:"events"`
FromSnapshot bool `json:"from_snapshot"`
SnapshotVersion int `json:"snapshot_version"`
}
EventStream represents a stream of events for an aggregate
type EventSubscription ¶
type EventSubscription struct {
Subject string `json:"subject"`
Handler EventHandler `json:"-"`
Subscription *nats.Subscription `json:"-"`
CreatedAt time.Time `json:"created_at"`
IsActive bool `json:"is_active"`
MessageCount uint64 `json:"message_count"`
}
EventSubscription represents an active subscription
type NATSClient ¶
type NATSClient struct {
// contains filtered or unexported fields
}
NATSClient provides event streaming capabilities using NATS
func NewNATSClient ¶
func NewNATSClient(config NATSConfig, logger *logrus.Logger) (*NATSClient, error)
NewNATSClient creates a new NATS client
func (*NATSClient) CreateStream ¶
func (nc *NATSClient) CreateStream(config StreamConfig) error
CreateStream creates a JetStream stream
func (*NATSClient) GetStats ¶
func (nc *NATSClient) GetStats() *nats.Statistics
GetStats returns connection statistics
func (*NATSClient) GetStreamInfo ¶
func (nc *NATSClient) GetStreamInfo(streamName string) (*StreamInfo, error)
GetStreamInfo returns information about a stream
func (*NATSClient) HealthCheck ¶
func (nc *NATSClient) HealthCheck(ctx context.Context) error
HealthCheck performs a health check
func (*NATSClient) IsConnected ¶
func (nc *NATSClient) IsConnected() bool
IsConnected returns the connection status
func (*NATSClient) ListStreams ¶
func (nc *NATSClient) ListStreams() ([]*StreamInfo, error)
ListStreams returns a list of all streams
func (*NATSClient) PublishEvent ¶
func (nc *NATSClient) PublishEvent(ctx context.Context, event *EventMessage) error
PublishEvent publishes an event to a subject
func (*NATSClient) QueueSubscribe ¶
func (nc *NATSClient) QueueSubscribe(handler EventHandler, queueGroup string) (*EventSubscription, error)
QueueSubscribe creates a queue subscription for load balancing
func (*NATSClient) Subscribe ¶
func (nc *NATSClient) Subscribe(handler EventHandler) (*EventSubscription, error)
Subscribe creates a subscription to a subject with a handler
func (*NATSClient) Unsubscribe ¶
func (nc *NATSClient) Unsubscribe(subscription *EventSubscription) error
Unsubscribe removes a subscription
type NATSConfig ¶
type NATSConfig struct {
URL string `json:"url"`
ClusterID string `json:"cluster_id"`
ClientID string `json:"client_id"`
MaxReconnect int `json:"max_reconnect"`
ReconnectWait time.Duration `json:"reconnect_wait"`
Timeout time.Duration `json:"timeout"`
EnableJetStream bool `json:"enable_jetstream"`
StreamConfig StreamConfig `json:"stream_config"`
}
NATSConfig holds NATS client configuration
type Snapshot ¶
type Snapshot struct {
ID string `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"`
AggregateID string `gorm:"type:uuid;not null;index" json:"aggregate_id"`
AggregateType string `gorm:"type:varchar(255);not null" json:"aggregate_type"`
Version int `gorm:"not null" json:"version"`
Data string `gorm:"type:jsonb;not null" json:"data"`
Timestamp time.Time `gorm:"not null;index" json:"timestamp"`
CreatedAt time.Time `gorm:"autoCreateTime" json:"created_at"`
}
Snapshot represents an aggregate snapshot
type StoredEvent ¶
type StoredEvent struct {
ID string `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"`
AggregateID string `gorm:"type:uuid;not null;index" json:"aggregate_id"`
AggregateType string `gorm:"type:varchar(255);not null;index" json:"aggregate_type"`
EventType string `gorm:"type:varchar(255);not null" json:"event_type"`
EventData string `gorm:"type:jsonb;not null" json:"event_data"`
EventMetadata string `gorm:"type:jsonb" json:"event_metadata"`
Version int `gorm:"not null;index" json:"version"`
Timestamp time.Time `gorm:"not null;index" json:"timestamp"`
CorrelationID string `gorm:"type:uuid;index" json:"correlation_id"`
CausationID string `gorm:"type:uuid;index" json:"causation_id"`
UserID string `gorm:"type:uuid;index" json:"user_id"`
TenantID string `gorm:"type:uuid;index" json:"tenant_id"`
CreatedAt time.Time `gorm:"autoCreateTime" json:"created_at"`
}
StoredEvent represents an event stored in the event store
type StreamConfig ¶
type StreamConfig struct {
Name string `json:"name"`
Subjects []string `json:"subjects"`
Retention string `json:"retention"` // WorkQueue, Interest, Limits
MaxAge time.Duration `json:"max_age"`
MaxBytes int64 `json:"max_bytes"`
MaxMsgs int64 `json:"max_msgs"`
Replicas int `json:"replicas"`
Storage string `json:"storage"` // File, Memory
}
StreamConfig holds JetStream configuration