events

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 21, 2025 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseEvent

type BaseEvent struct {
	AggregateID   string                 `json:"aggregate_id"`
	EventType     string                 `json:"event_type"`
	Data          map[string]interface{} `json:"data"`
	Metadata      map[string]interface{} `json:"metadata"`
	Version       int                    `json:"version"`
	Timestamp     time.Time              `json:"timestamp"`
	CorrelationID string                 `json:"correlation_id"`
	CausationID   string                 `json:"causation_id"`
	UserID        string                 `json:"user_id"`
	TenantID      string                 `json:"tenant_id"`
}

BaseEvent provides a base implementation for events

func (*BaseEvent) GetAggregateID

func (e *BaseEvent) GetAggregateID() string

GetAggregateID returns the aggregate ID

func (*BaseEvent) GetEventData

func (e *BaseEvent) GetEventData() map[string]interface{}

GetEventData returns the event data

func (*BaseEvent) GetEventType

func (e *BaseEvent) GetEventType() string

GetEventType returns the event type

func (*BaseEvent) GetMetadata

func (e *BaseEvent) GetMetadata() map[string]interface{}

GetMetadata returns the event metadata

func (*BaseEvent) GetTimestamp

func (e *BaseEvent) GetTimestamp() time.Time

GetTimestamp returns the event timestamp

func (*BaseEvent) GetVersion

func (e *BaseEvent) GetVersion() int

GetVersion returns the event version

type Event

type Event interface {
	GetEventType() string
	GetAggregateID() string
	GetEventData() map[string]interface{}
	GetMetadata() map[string]interface{}
	GetVersion() int
	GetTimestamp() time.Time
}

Event represents a domain event

type EventHandler

type EventHandler interface {
	Handle(ctx context.Context, event *EventMessage) error
	GetSubject() string
	GetName() string
}

EventHandler defines the interface for event handlers

type EventMessage

type EventMessage struct {
	ID            string                 `json:"id"`
	Type          string                 `json:"type"`
	Source        string                 `json:"source"`
	Subject       string                 `json:"subject"`
	Data          map[string]interface{} `json:"data"`
	Timestamp     time.Time              `json:"timestamp"`
	Version       string                 `json:"version"`
	CorrelationID string                 `json:"correlation_id,omitempty"`
	Headers       map[string]string      `json:"headers,omitempty"`
}

EventMessage represents an event message

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

EventStore provides event sourcing capabilities

func NewEventStore

func NewEventStore(db *gorm.DB, config EventStoreConfig, logger *logrus.Logger) (*EventStore, error)

NewEventStore creates a new event store

func (*EventStore) AppendEvents

func (es *EventStore) AppendEvents(ctx context.Context, aggregateID, aggregateType string, expectedVersion int, events []Event) error

AppendEvents appends events to an aggregate stream

func (*EventStore) Close

func (es *EventStore) Close() error

Close closes the event store

func (*EventStore) CreateSnapshot

func (es *EventStore) CreateSnapshot(ctx context.Context, aggregateID, aggregateType string, version int, data interface{}) error

CreateSnapshot creates a snapshot of an aggregate

func (*EventStore) GetAggregateVersion

func (es *EventStore) GetAggregateVersion(ctx context.Context, aggregateID, aggregateType string) (int, error)

GetAggregateVersion returns the current version of an aggregate

func (*EventStore) GetEventCount

func (es *EventStore) GetEventCount(ctx context.Context) (int64, error)

GetEventCount returns the total number of events

func (*EventStore) GetEventStream

func (es *EventStore) GetEventStream(ctx context.Context, aggregateID, aggregateType string, fromVersion int) (*EventStream, error)

GetEventStream retrieves an event stream for an aggregate

func (*EventStore) GetEventsByCorrelationID

func (es *EventStore) GetEventsByCorrelationID(ctx context.Context, correlationID string) ([]*StoredEvent, error)

GetEventsByCorrelationID retrieves events by correlation ID

func (*EventStore) GetEventsByTimeRange

func (es *EventStore) GetEventsByTimeRange(ctx context.Context, start, end time.Time, limit int) ([]*StoredEvent, error)

GetEventsByTimeRange retrieves events within a time range

func (*EventStore) GetEventsByType

func (es *EventStore) GetEventsByType(ctx context.Context, eventType string, limit int, offset int) ([]*StoredEvent, error)

GetEventsByType retrieves events by event type

func (*EventStore) GetLatestSnapshot

func (es *EventStore) GetLatestSnapshot(ctx context.Context, aggregateID, aggregateType string) (*Snapshot, error)

GetLatestSnapshot retrieves the latest snapshot for an aggregate

func (*EventStore) ReplayEvents

func (es *EventStore) ReplayEvents(ctx context.Context, aggregateID, aggregateType string, eventHandler func(*StoredEvent) error) error

ReplayEvents replays events to rebuild aggregate state

type EventStoreConfig

type EventStoreConfig struct {
	TableName        string        `json:"table_name"`
	SnapshotTable    string        `json:"snapshot_table"`
	SnapshotInterval int           `json:"snapshot_interval"`
	RetentionPeriod  time.Duration `json:"retention_period"`
	EnableSnapshot   bool          `json:"enable_snapshot"`
}

EventStoreConfig holds event store configuration

type EventStream

type EventStream struct {
	AggregateID     string         `json:"aggregate_id"`
	AggregateType   string         `json:"aggregate_type"`
	Version         int            `json:"version"`
	Events          []*StoredEvent `json:"events"`
	FromSnapshot    bool           `json:"from_snapshot"`
	SnapshotVersion int            `json:"snapshot_version"`
}

EventStream represents a stream of events for an aggregate

type EventSubscription

type EventSubscription struct {
	Subject      string             `json:"subject"`
	Handler      EventHandler       `json:"-"`
	Subscription *nats.Subscription `json:"-"`
	CreatedAt    time.Time          `json:"created_at"`
	IsActive     bool               `json:"is_active"`
	MessageCount uint64             `json:"message_count"`
}

EventSubscription represents an active subscription

type NATSClient

type NATSClient struct {
	// contains filtered or unexported fields
}

NATSClient provides event streaming capabilities using NATS

func NewNATSClient

func NewNATSClient(config NATSConfig, logger *logrus.Logger) (*NATSClient, error)

NewNATSClient creates a new NATS client

func (*NATSClient) Close

func (nc *NATSClient) Close() error

Close closes the NATS connection

func (*NATSClient) CreateStream

func (nc *NATSClient) CreateStream(config StreamConfig) error

CreateStream creates a JetStream stream

func (*NATSClient) GetStats

func (nc *NATSClient) GetStats() nats.Statistics

GetStats returns connection statistics

func (*NATSClient) GetStreamInfo

func (nc *NATSClient) GetStreamInfo(streamName string) (*StreamInfo, error)

GetStreamInfo returns information about a stream

func (*NATSClient) HealthCheck

func (nc *NATSClient) HealthCheck(ctx context.Context) error

HealthCheck performs a health check

func (*NATSClient) IsConnected

func (nc *NATSClient) IsConnected() bool

IsConnected returns the connection status

func (*NATSClient) ListStreams

func (nc *NATSClient) ListStreams() ([]*StreamInfo, error)

ListStreams returns a list of all streams

func (*NATSClient) PublishEvent

func (nc *NATSClient) PublishEvent(ctx context.Context, event *EventMessage) error

PublishEvent publishes an event to a subject

func (*NATSClient) QueueSubscribe

func (nc *NATSClient) QueueSubscribe(handler EventHandler, queueGroup string) (*EventSubscription, error)

QueueSubscribe creates a queue subscription for load balancing

func (*NATSClient) Subscribe

func (nc *NATSClient) Subscribe(handler EventHandler) (*EventSubscription, error)

Subscribe creates a subscription to a subject with a handler

func (*NATSClient) Unsubscribe

func (nc *NATSClient) Unsubscribe(subscription *EventSubscription) error

Unsubscribe removes a subscription

type NATSConfig

type NATSConfig struct {
	URL             string        `json:"url"`
	ClusterID       string        `json:"cluster_id"`
	ClientID        string        `json:"client_id"`
	MaxReconnect    int           `json:"max_reconnect"`
	ReconnectWait   time.Duration `json:"reconnect_wait"`
	Timeout         time.Duration `json:"timeout"`
	EnableJetStream bool          `json:"enable_jetstream"`
	StreamConfig    StreamConfig  `json:"stream_config"`
}

NATSConfig holds NATS client configuration

type Snapshot

type Snapshot struct {
	ID            string    `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"`
	AggregateID   string    `gorm:"type:uuid;not null;index" json:"aggregate_id"`
	AggregateType string    `gorm:"type:varchar(255);not null" json:"aggregate_type"`
	Version       int       `gorm:"not null" json:"version"`
	Data          string    `gorm:"type:jsonb;not null" json:"data"`
	Timestamp     time.Time `gorm:"not null;index" json:"timestamp"`
	CreatedAt     time.Time `gorm:"autoCreateTime" json:"created_at"`
}

Snapshot represents an aggregate snapshot

type StoredEvent

type StoredEvent struct {
	ID            string    `gorm:"type:uuid;primary_key;default:gen_random_uuid()" json:"id"`
	AggregateID   string    `gorm:"type:uuid;not null;index" json:"aggregate_id"`
	AggregateType string    `gorm:"type:varchar(255);not null;index" json:"aggregate_type"`
	EventType     string    `gorm:"type:varchar(255);not null" json:"event_type"`
	EventData     string    `gorm:"type:jsonb;not null" json:"event_data"`
	EventMetadata string    `gorm:"type:jsonb" json:"event_metadata"`
	Version       int       `gorm:"not null;index" json:"version"`
	Timestamp     time.Time `gorm:"not null;index" json:"timestamp"`
	CorrelationID string    `gorm:"type:uuid;index" json:"correlation_id"`
	CausationID   string    `gorm:"type:uuid;index" json:"causation_id"`
	UserID        string    `gorm:"type:uuid;index" json:"user_id"`
	TenantID      string    `gorm:"type:uuid;index" json:"tenant_id"`
	CreatedAt     time.Time `gorm:"autoCreateTime" json:"created_at"`
}

StoredEvent represents an event stored in the event store

type StreamConfig

type StreamConfig struct {
	Name      string        `json:"name"`
	Subjects  []string      `json:"subjects"`
	Retention string        `json:"retention"` // WorkQueue, Interest, Limits
	MaxAge    time.Duration `json:"max_age"`
	MaxBytes  int64         `json:"max_bytes"`
	MaxMsgs   int64         `json:"max_msgs"`
	Replicas  int           `json:"replicas"`
	Storage   string        `json:"storage"` // File, Memory
}

StreamConfig holds JetStream configuration

type StreamInfo

type StreamInfo struct {
	Name      string    `json:"name"`
	Subjects  []string  `json:"subjects"`
	CreatedAt time.Time `json:"created_at"`
	Messages  uint64    `json:"messages"`
	Bytes     uint64    `json:"bytes"`
	State     string    `json:"state"`
}

StreamInfo contains information about a stream

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL