 Documentation
      ¶
      Documentation
      ¶
    
    
  
    
  
    Index ¶
- type BaseEvent
- type Event
- type EventHandler
- type EventMessage
- type EventStore
- func (es *EventStore) AppendEvents(ctx context.Context, aggregateID, aggregateType string, expectedVersion int, ...) error
- func (es *EventStore) Close() error
- func (es *EventStore) CreateSnapshot(ctx context.Context, aggregateID, aggregateType string, version int, ...) error
- func (es *EventStore) GetAggregateVersion(ctx context.Context, aggregateID, aggregateType string) (int, error)
- func (es *EventStore) GetEventCount(ctx context.Context) (int64, error)
- func (es *EventStore) GetEventStream(ctx context.Context, aggregateID, aggregateType string, fromVersion int) (*EventStream, error)
- func (es *EventStore) GetEventsByCorrelationID(ctx context.Context, correlationID string) ([]*StoredEvent, error)
- func (es *EventStore) GetEventsByTimeRange(ctx context.Context, start, end time.Time, limit int) ([]*StoredEvent, error)
- func (es *EventStore) GetEventsByType(ctx context.Context, eventType string, limit int, offset int) ([]*StoredEvent, error)
- func (es *EventStore) GetLatestSnapshot(ctx context.Context, aggregateID, aggregateType string) (*Snapshot, error)
- func (es *EventStore) ReplayEvents(ctx context.Context, aggregateID, aggregateType string, ...) error
 
- type EventStoreConfig
- type EventStream
- type EventSubscription
- type NATSClient
- func (nc *NATSClient) Close() error
- func (nc *NATSClient) CreateStream(config StreamConfig) error
- func (nc *NATSClient) GetStats() nats.Statistics
- func (nc *NATSClient) GetStreamInfo(streamName string) (*StreamInfo, error)
- func (nc *NATSClient) HealthCheck(ctx context.Context) error
- func (nc *NATSClient) IsConnected() bool
- func (nc *NATSClient) ListStreams() ([]*StreamInfo, error)
- func (nc *NATSClient) PublishEvent(ctx context.Context, event *EventMessage) error
- func (nc *NATSClient) QueueSubscribe(handler EventHandler, queueGroup string) (*EventSubscription, error)
- func (nc *NATSClient) Subscribe(handler EventHandler) (*EventSubscription, error)
- func (nc *NATSClient) Unsubscribe(subscription *EventSubscription) error
 
- type NATSConfig
- type Snapshot
- type StoredEvent
- type StreamConfig
- type StreamInfo
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BaseEvent ¶
type BaseEvent struct {
	AggregateID   string                 `json:"aggregate_id"`
	EventType     string                 `json:"event_type"`
	Data          map[string]interface{} `json:"data"`
	Metadata      map[string]interface{} `json:"metadata"`
	Version       int                    `json:"version"`
	Timestamp     time.Time              `json:"timestamp"`
	CorrelationID string                 `json:"correlation_id"`
	CausationID   string                 `json:"causation_id"`
	UserID        string                 `json:"user_id"`
	TenantID      string                 `json:"tenant_id"`
}
    BaseEvent provides a base implementation for events
func (*BaseEvent) GetAggregateID ¶
GetAggregateID returns the aggregate ID
func (*BaseEvent) GetEventData ¶
GetEventData returns the event data
func (*BaseEvent) GetEventType ¶
GetEventType returns the event type
func (*BaseEvent) GetMetadata ¶
GetMetadata returns the event metadata
func (*BaseEvent) GetTimestamp ¶
GetTimestamp returns the event timestamp
func (*BaseEvent) GetVersion ¶
GetVersion returns the event version
type Event ¶
type Event interface {
	GetEventType() string
	GetAggregateID() string
	GetEventData() map[string]interface{}
	GetMetadata() map[string]interface{}
	GetVersion() int
	GetTimestamp() time.Time
}
    Event represents a domain event
type EventHandler ¶
type EventHandler interface {
	Handle(ctx context.Context, event *EventMessage) error
	GetSubject() string
	GetName() string
}
    EventHandler defines the interface for event handlers
type EventMessage ¶
type EventMessage struct {
	ID            string                 `json:"id"`
	Type          string                 `json:"type"`
	Source        string                 `json:"source"`
	Subject       string                 `json:"subject"`
	Data          map[string]interface{} `json:"data"`
	Timestamp     time.Time              `json:"timestamp"`
	Version       string                 `json:"version"`
	CorrelationID string                 `json:"correlation_id,omitempty"`
	Headers       map[string]string      `json:"headers,omitempty"`
}
    EventMessage represents an event message
type EventStore ¶
type EventStore struct {
	// contains filtered or unexported fields
}
    EventStore provides event sourcing capabilities
func NewEventStore ¶
func NewEventStore(db *gorm.DB, config EventStoreConfig, logger *logrus.Logger) (*EventStore, error)
NewEventStore creates a new event store
func (*EventStore) AppendEvents ¶
func (es *EventStore) AppendEvents(ctx context.Context, aggregateID, aggregateType string, expectedVersion int, events []Event) error
AppendEvents appends events to an aggregate stream
func (*EventStore) CreateSnapshot ¶
func (es *EventStore) CreateSnapshot(ctx context.Context, aggregateID, aggregateType string, version int, data interface{}) error
CreateSnapshot creates a snapshot of an aggregate
func (*EventStore) GetAggregateVersion ¶
func (es *EventStore) GetAggregateVersion(ctx context.Context, aggregateID, aggregateType string) (int, error)
GetAggregateVersion returns the current version of an aggregate
func (*EventStore) GetEventCount ¶
func (es *EventStore) GetEventCount(ctx context.Context) (int64, error)
GetEventCount returns the total number of events
func (*EventStore) GetEventStream ¶
func (es *EventStore) GetEventStream(ctx context.Context, aggregateID, aggregateType string, fromVersion int) (*EventStream, error)
GetEventStream retrieves an event stream for an aggregate
func (*EventStore) GetEventsByCorrelationID ¶
func (es *EventStore) GetEventsByCorrelationID(ctx context.Context, correlationID string) ([]*StoredEvent, error)
GetEventsByCorrelationID retrieves events by correlation ID
func (*EventStore) GetEventsByTimeRange ¶
func (es *EventStore) GetEventsByTimeRange(ctx context.Context, start, end time.Time, limit int) ([]*StoredEvent, error)
GetEventsByTimeRange retrieves events within a time range
func (*EventStore) GetEventsByType ¶
func (es *EventStore) GetEventsByType(ctx context.Context, eventType string, limit int, offset int) ([]*StoredEvent, error)
GetEventsByType retrieves events by event type
func (*EventStore) GetLatestSnapshot ¶
func (es *EventStore) GetLatestSnapshot(ctx context.Context, aggregateID, aggregateType string) (*Snapshot, error)
GetLatestSnapshot retrieves the latest snapshot for an aggregate
func (*EventStore) ReplayEvents ¶
func (es *EventStore) ReplayEvents(ctx context.Context, aggregateID, aggregateType string, eventHandler func(*StoredEvent) error) error
ReplayEvents replays events to rebuild aggregate state
type EventStoreConfig ¶
type EventStoreConfig struct {
	TableName        string        `json:"table_name"`
	SnapshotTable    string        `json:"snapshot_table"`
	SnapshotInterval int           `json:"snapshot_interval"`
	RetentionPeriod  time.Duration `json:"retention_period"`
	EnableSnapshot   bool          `json:"enable_snapshot"`
}
    EventStoreConfig holds event store configuration
type EventStream ¶
type EventStream struct {
	AggregateID     string         `json:"aggregate_id"`
	AggregateType   string         `json:"aggregate_type"`
	Version         int            `json:"version"`
	Events          []*StoredEvent `json:"events"`
	FromSnapshot    bool           `json:"from_snapshot"`
	SnapshotVersion int            `json:"snapshot_version"`
}
    EventStream represents a stream of events for an aggregate
type EventSubscription ¶
type EventSubscription struct {
	Subject      string             `json:"subject"`
	Handler      EventHandler       `json:"-"`
	Subscription *nats.Subscription `json:"-"`
	CreatedAt    time.Time          `json:"created_at"`
	IsActive     bool               `json:"is_active"`
	MessageCount uint64             `json:"message_count"`
}
    EventSubscription represents an active subscription
type NATSClient ¶
type NATSClient struct {
	// contains filtered or unexported fields
}
    NATSClient provides event streaming capabilities using NATS
func NewNATSClient ¶
func NewNATSClient(config NATSConfig, logger *logrus.Logger) (*NATSClient, error)
NewNATSClient creates a new NATS client
func (*NATSClient) CreateStream ¶
func (nc *NATSClient) CreateStream(config StreamConfig) error
CreateStream creates a JetStream stream
func (*NATSClient) GetStats ¶
func (nc *NATSClient) GetStats() nats.Statistics
GetStats returns connection statistics
func (*NATSClient) GetStreamInfo ¶
func (nc *NATSClient) GetStreamInfo(streamName string) (*StreamInfo, error)
GetStreamInfo returns information about a stream
func (*NATSClient) HealthCheck ¶
func (nc *NATSClient) HealthCheck(ctx context.Context) error
HealthCheck performs a health check
func (*NATSClient) IsConnected ¶
func (nc *NATSClient) IsConnected() bool
IsConnected returns the connection status
func (*NATSClient) ListStreams ¶
func (nc *NATSClient) ListStreams() ([]*StreamInfo, error)
ListStreams returns a list of all streams
func (*NATSClient) PublishEvent ¶
func (nc *NATSClient) PublishEvent(ctx context.Context, event *EventMessage) error
PublishEvent publishes an event to a subject
func (*NATSClient) QueueSubscribe ¶
func (nc *NATSClient) QueueSubscribe(handler EventHandler, queueGroup string) (*EventSubscription, error)
QueueSubscribe creates a queue subscription for load balancing
func (*NATSClient) Subscribe ¶
func (nc *NATSClient) Subscribe(handler EventHandler) (*EventSubscription, error)
Subscribe creates a subscription to a subject with a handler
func (*NATSClient) Unsubscribe ¶
func (nc *NATSClient) Unsubscribe(subscription *EventSubscription) error
Unsubscribe removes a subscription
type NATSConfig ¶
type NATSConfig struct {
	URL             string        `json:"url"`
	ClusterID       string        `json:"cluster_id"`
	ClientID        string        `json:"client_id"`
	MaxReconnect    int           `json:"max_reconnect"`
	ReconnectWait   time.Duration `json:"reconnect_wait"`
	Timeout         time.Duration `json:"timeout"`
	EnableJetStream bool          `json:"enable_jetstream"`
	StreamConfig    StreamConfig  `json:"stream_config"`
}
    NATSConfig holds NATS client configuration
type Snapshot ¶
type Snapshot struct {
	ID            string    `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"`
	AggregateID   string    `gorm:"type:uuid;not null;index" json:"aggregate_id"`
	AggregateType string    `gorm:"type:varchar(255);not null" json:"aggregate_type"`
	Version       int       `gorm:"not null" json:"version"`
	Data          string    `gorm:"type:jsonb;not null" json:"data"`
	Timestamp     time.Time `gorm:"not null;index" json:"timestamp"`
	CreatedAt     time.Time `gorm:"autoCreateTime" json:"created_at"`
}
    Snapshot represents an aggregate snapshot
type StoredEvent ¶
type StoredEvent struct {
	ID            string    `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"`
	AggregateID   string    `gorm:"type:uuid;not null;index" json:"aggregate_id"`
	AggregateType string    `gorm:"type:varchar(255);not null;index" json:"aggregate_type"`
	EventType     string    `gorm:"type:varchar(255);not null" json:"event_type"`
	EventData     string    `gorm:"type:jsonb;not null" json:"event_data"`
	EventMetadata string    `gorm:"type:jsonb" json:"event_metadata"`
	Version       int       `gorm:"not null;index" json:"version"`
	Timestamp     time.Time `gorm:"not null;index" json:"timestamp"`
	CorrelationID string    `gorm:"type:uuid;index" json:"correlation_id"`
	CausationID   string    `gorm:"type:uuid;index" json:"causation_id"`
	UserID        string    `gorm:"type:uuid;index" json:"user_id"`
	TenantID      string    `gorm:"type:uuid;index" json:"tenant_id"`
	CreatedAt     time.Time `gorm:"autoCreateTime" json:"created_at"`
}
    StoredEvent represents an event stored in the event store
type StreamConfig ¶
type StreamConfig struct {
	Name      string        `json:"name"`
	Subjects  []string      `json:"subjects"`
	Retention string        `json:"retention"` // WorkQueue, Interest, Limits
	MaxAge    time.Duration `json:"max_age"`
	MaxBytes  int64         `json:"max_bytes"`
	MaxMsgs   int64         `json:"max_msgs"`
	Replicas  int           `json:"replicas"`
	Storage   string        `json:"storage"` // File, Memory
}
    StreamConfig holds JetStream configuration