Documentation
¶
Index ¶
- type BaseEvent
- type Event
- type EventHandler
- type EventMessage
- type EventStore
- func (es *EventStore) AppendEvents(ctx context.Context, aggregateID, aggregateType string, expectedVersion int, ...) error
- func (es *EventStore) Close() error
- func (es *EventStore) CreateSnapshot(ctx context.Context, aggregateID, aggregateType string, version int, ...) error
- func (es *EventStore) GetAggregateVersion(ctx context.Context, aggregateID, aggregateType string) (int, error)
- func (es *EventStore) GetEventCount(ctx context.Context) (int64, error)
- func (es *EventStore) GetEventStream(ctx context.Context, aggregateID, aggregateType string, fromVersion int) (*EventStream, error)
- func (es *EventStore) GetEventsByCorrelationID(ctx context.Context, correlationID string) ([]*StoredEvent, error)
- func (es *EventStore) GetEventsByTimeRange(ctx context.Context, start, end time.Time, limit int) ([]*StoredEvent, error)
- func (es *EventStore) GetEventsByType(ctx context.Context, eventType string, limit int, offset int) ([]*StoredEvent, error)
- func (es *EventStore) GetLatestSnapshot(ctx context.Context, aggregateID, aggregateType string) (*Snapshot, error)
- func (es *EventStore) ReplayEvents(ctx context.Context, aggregateID, aggregateType string, ...) error
- type EventStoreConfig
- type EventStream
- type EventSubscription
- type NATSClient
- func (nc *NATSClient) Close() error
- func (nc *NATSClient) CreateStream(config StreamConfig) error
- func (nc *NATSClient) GetStats() nats.Statistics
- func (nc *NATSClient) GetStreamInfo(streamName string) (*StreamInfo, error)
- func (nc *NATSClient) HealthCheck(ctx context.Context) error
- func (nc *NATSClient) IsConnected() bool
- func (nc *NATSClient) ListStreams() ([]*StreamInfo, error)
- func (nc *NATSClient) PublishEvent(ctx context.Context, event *EventMessage) error
- func (nc *NATSClient) QueueSubscribe(handler EventHandler, queueGroup string) (*EventSubscription, error)
- func (nc *NATSClient) Subscribe(handler EventHandler) (*EventSubscription, error)
- func (nc *NATSClient) Unsubscribe(subscription *EventSubscription) error
- type NATSConfig
- type Snapshot
- type StoredEvent
- type StreamConfig
- type StreamInfo
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BaseEvent ¶
type BaseEvent struct { AggregateID string `json:"aggregate_id"` EventType string `json:"event_type"` Data map[string]interface{} `json:"data"` Metadata map[string]interface{} `json:"metadata"` Version int `json:"version"` Timestamp time.Time `json:"timestamp"` CorrelationID string `json:"correlation_id"` CausationID string `json:"causation_id"` UserID string `json:"user_id"` TenantID string `json:"tenant_id"` }
BaseEvent provides a base implementation for events
func (*BaseEvent) GetAggregateID ¶
GetAggregateID returns the aggregate ID
func (*BaseEvent) GetEventData ¶
GetEventData returns the event data
func (*BaseEvent) GetEventType ¶
GetEventType returns the event type
func (*BaseEvent) GetMetadata ¶
GetMetadata returns the event metadata
func (*BaseEvent) GetTimestamp ¶
GetTimestamp returns the event timestamp
func (*BaseEvent) GetVersion ¶
GetVersion returns the event version
type Event ¶
type Event interface { GetEventType() string GetAggregateID() string GetEventData() map[string]interface{} GetMetadata() map[string]interface{} GetVersion() int GetTimestamp() time.Time }
Event represents a domain event
type EventHandler ¶
type EventHandler interface { Handle(ctx context.Context, event *EventMessage) error GetSubject() string GetName() string }
EventHandler defines the interface for event handlers
type EventMessage ¶
type EventMessage struct { ID string `json:"id"` Type string `json:"type"` Source string `json:"source"` Subject string `json:"subject"` Data map[string]interface{} `json:"data"` Timestamp time.Time `json:"timestamp"` Version string `json:"version"` CorrelationID string `json:"correlation_id,omitempty"` Headers map[string]string `json:"headers,omitempty"` }
EventMessage represents an event message
type EventStore ¶
type EventStore struct {
// contains filtered or unexported fields
}
EventStore provides event sourcing capabilities
func NewEventStore ¶
func NewEventStore(db *gorm.DB, config EventStoreConfig, logger *logrus.Logger) (*EventStore, error)
NewEventStore creates a new event store
func (*EventStore) AppendEvents ¶
func (es *EventStore) AppendEvents(ctx context.Context, aggregateID, aggregateType string, expectedVersion int, events []Event) error
AppendEvents appends events to an aggregate stream
func (*EventStore) CreateSnapshot ¶
func (es *EventStore) CreateSnapshot(ctx context.Context, aggregateID, aggregateType string, version int, data interface{}) error
CreateSnapshot creates a snapshot of an aggregate
func (*EventStore) GetAggregateVersion ¶
func (es *EventStore) GetAggregateVersion(ctx context.Context, aggregateID, aggregateType string) (int, error)
GetAggregateVersion returns the current version of an aggregate
func (*EventStore) GetEventCount ¶
func (es *EventStore) GetEventCount(ctx context.Context) (int64, error)
GetEventCount returns the total number of events
func (*EventStore) GetEventStream ¶
func (es *EventStore) GetEventStream(ctx context.Context, aggregateID, aggregateType string, fromVersion int) (*EventStream, error)
GetEventStream retrieves an event stream for an aggregate
func (*EventStore) GetEventsByCorrelationID ¶
func (es *EventStore) GetEventsByCorrelationID(ctx context.Context, correlationID string) ([]*StoredEvent, error)
GetEventsByCorrelationID retrieves events by correlation ID
func (*EventStore) GetEventsByTimeRange ¶
func (es *EventStore) GetEventsByTimeRange(ctx context.Context, start, end time.Time, limit int) ([]*StoredEvent, error)
GetEventsByTimeRange retrieves events within a time range
func (*EventStore) GetEventsByType ¶
func (es *EventStore) GetEventsByType(ctx context.Context, eventType string, limit int, offset int) ([]*StoredEvent, error)
GetEventsByType retrieves events by event type
func (*EventStore) GetLatestSnapshot ¶
func (es *EventStore) GetLatestSnapshot(ctx context.Context, aggregateID, aggregateType string) (*Snapshot, error)
GetLatestSnapshot retrieves the latest snapshot for an aggregate
func (*EventStore) ReplayEvents ¶
func (es *EventStore) ReplayEvents(ctx context.Context, aggregateID, aggregateType string, eventHandler func(*StoredEvent) error) error
ReplayEvents replays events to rebuild aggregate state
type EventStoreConfig ¶
type EventStoreConfig struct { TableName string `json:"table_name"` SnapshotTable string `json:"snapshot_table"` SnapshotInterval int `json:"snapshot_interval"` RetentionPeriod time.Duration `json:"retention_period"` EnableSnapshot bool `json:"enable_snapshot"` }
EventStoreConfig holds event store configuration
type EventStream ¶
type EventStream struct { AggregateID string `json:"aggregate_id"` AggregateType string `json:"aggregate_type"` Version int `json:"version"` Events []*StoredEvent `json:"events"` FromSnapshot bool `json:"from_snapshot"` SnapshotVersion int `json:"snapshot_version"` }
EventStream represents a stream of events for an aggregate
type EventSubscription ¶
type EventSubscription struct { Subject string `json:"subject"` Handler EventHandler `json:"-"` Subscription *nats.Subscription `json:"-"` CreatedAt time.Time `json:"created_at"` IsActive bool `json:"is_active"` MessageCount uint64 `json:"message_count"` }
EventSubscription represents an active subscription
type NATSClient ¶
type NATSClient struct {
// contains filtered or unexported fields
}
NATSClient provides event streaming capabilities using NATS
func NewNATSClient ¶
func NewNATSClient(config NATSConfig, logger *logrus.Logger) (*NATSClient, error)
NewNATSClient creates a new NATS client
func (*NATSClient) CreateStream ¶
func (nc *NATSClient) CreateStream(config StreamConfig) error
CreateStream creates a JetStream stream
func (*NATSClient) GetStats ¶
func (nc *NATSClient) GetStats() nats.Statistics
GetStats returns connection statistics
func (*NATSClient) GetStreamInfo ¶
func (nc *NATSClient) GetStreamInfo(streamName string) (*StreamInfo, error)
GetStreamInfo returns information about a stream
func (*NATSClient) HealthCheck ¶
func (nc *NATSClient) HealthCheck(ctx context.Context) error
HealthCheck performs a health check
func (*NATSClient) IsConnected ¶
func (nc *NATSClient) IsConnected() bool
IsConnected returns the connection status
func (*NATSClient) ListStreams ¶
func (nc *NATSClient) ListStreams() ([]*StreamInfo, error)
ListStreams returns a list of all streams
func (*NATSClient) PublishEvent ¶
func (nc *NATSClient) PublishEvent(ctx context.Context, event *EventMessage) error
PublishEvent publishes an event to a subject
func (*NATSClient) QueueSubscribe ¶
func (nc *NATSClient) QueueSubscribe(handler EventHandler, queueGroup string) (*EventSubscription, error)
QueueSubscribe creates a queue subscription for load balancing
func (*NATSClient) Subscribe ¶
func (nc *NATSClient) Subscribe(handler EventHandler) (*EventSubscription, error)
Subscribe creates a subscription to a subject with a handler
func (*NATSClient) Unsubscribe ¶
func (nc *NATSClient) Unsubscribe(subscription *EventSubscription) error
Unsubscribe removes a subscription
type NATSConfig ¶
type NATSConfig struct { URL string `json:"url"` ClusterID string `json:"cluster_id"` ClientID string `json:"client_id"` MaxReconnect int `json:"max_reconnect"` ReconnectWait time.Duration `json:"reconnect_wait"` Timeout time.Duration `json:"timeout"` EnableJetStream bool `json:"enable_jetstream"` StreamConfig StreamConfig `json:"stream_config"` }
NATSConfig holds NATS client configuration
type Snapshot ¶
type Snapshot struct { ID string `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"` AggregateID string `gorm:"type:uuid;not null;index" json:"aggregate_id"` AggregateType string `gorm:"type:varchar(255);not null" json:"aggregate_type"` Version int `gorm:"not null" json:"version"` Data string `gorm:"type:jsonb;not null" json:"data"` Timestamp time.Time `gorm:"not null;index" json:"timestamp"` CreatedAt time.Time `gorm:"autoCreateTime" json:"created_at"` }
Snapshot represents an aggregate snapshot
type StoredEvent ¶
type StoredEvent struct { ID string `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"` AggregateID string `gorm:"type:uuid;not null;index" json:"aggregate_id"` AggregateType string `gorm:"type:varchar(255);not null;index" json:"aggregate_type"` EventType string `gorm:"type:varchar(255);not null" json:"event_type"` EventData string `gorm:"type:jsonb;not null" json:"event_data"` EventMetadata string `gorm:"type:jsonb" json:"event_metadata"` Version int `gorm:"not null;index" json:"version"` Timestamp time.Time `gorm:"not null;index" json:"timestamp"` CorrelationID string `gorm:"type:uuid;index" json:"correlation_id"` CausationID string `gorm:"type:uuid;index" json:"causation_id"` UserID string `gorm:"type:uuid;index" json:"user_id"` TenantID string `gorm:"type:uuid;index" json:"tenant_id"` CreatedAt time.Time `gorm:"autoCreateTime" json:"created_at"` }
StoredEvent represents an event stored in the event store
type StreamConfig ¶
type StreamConfig struct { Name string `json:"name"` Subjects []string `json:"subjects"` Retention string `json:"retention"` // WorkQueue, Interest, Limits MaxAge time.Duration `json:"max_age"` MaxBytes int64 `json:"max_bytes"` MaxMsgs int64 `json:"max_msgs"` Replicas int `json:"replicas"` Storage string `json:"storage"` // File, Memory }
StreamConfig holds JetStream configuration