adapters

package
v1.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2026 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Overview

Package adapters provides interfaces for event store backends.

Package adapters provides interfaces and shared utilities for event store backends.

Index

Constants

View Source
const (
	// AnyVersion skips version checking. Use when you don't care about concurrent modifications.
	AnyVersion int64 = -1

	// NoStream requires the stream to not exist. Use for creating new streams.
	NoStream int64 = 0

	// StreamExists requires the stream to exist. Use when you expect to append to an existing stream.
	StreamExists int64 = -2
)

Version constants for optimistic concurrency control. These constants define special version values used in Append operations.

Variables

View Source
var (
	// ErrConcurrencyConflict is returned when optimistic concurrency check fails.
	ErrConcurrencyConflict = errors.New("mink: concurrency conflict")

	// ErrStreamNotFound is returned when a stream does not exist.
	ErrStreamNotFound = errors.New("mink: stream not found")

	// ErrEmptyStreamID is returned when an empty stream ID is provided.
	ErrEmptyStreamID = errors.New("mink: stream ID is required")

	// ErrNoEvents is returned when attempting to append zero events.
	ErrNoEvents = errors.New("mink: no events to append")

	// ErrInvalidVersion is returned when an invalid version is specified.
	ErrInvalidVersion = errors.New("mink: invalid version")

	// ErrAdapterClosed is returned when operations are attempted on a closed adapter.
	ErrAdapterClosed = errors.New("mink: adapter is closed")

	// ErrSagaNotFound indicates the requested saga does not exist.
	ErrSagaNotFound = errors.New("mink: saga not found")

	// ErrSagaAlreadyExists indicates a saga with the same ID already exists.
	ErrSagaAlreadyExists = errors.New("mink: saga already exists")

	// ErrNilAggregate indicates a nil aggregate was passed.
	ErrNilAggregate = errors.New("mink: nil aggregate")

	// ErrOutboxMessageNotFound indicates the requested outbox message does not exist.
	ErrOutboxMessageNotFound = errors.New("mink: outbox message not found")
)

Sentinel errors for adapter implementations. Adapters should return these (or errors that match via errors.Is) to enable consistent error handling across different backends.

Functions

func CheckVersion

func CheckVersion(streamID string, expected, current int64, exists bool) error

CheckVersion validates the expected version against the current version. This implements the optimistic concurrency control logic shared by all adapters.

Parameters:

  • streamID: The stream identifier (used for error messages)
  • expected: The expected version (can be AnyVersion, NoStream, StreamExists, or a positive version)
  • current: The current version of the stream
  • exists: Whether the stream currently exists

Returns nil if the version check passes, or an appropriate error otherwise.

func DefaultLimit

func DefaultLimit(limit, defaultValue int) int

DefaultLimit returns a default limit value if the provided limit is invalid. Used for pagination in LoadFromPosition and similar methods.

func ExtractCategory

func ExtractCategory(streamID string) string

ExtractCategory extracts the category from a stream ID. Stream IDs are expected to follow the format "Category-ID" (e.g., "Order-123"). The category is the portion before the first hyphen.

Behavior:

  • "Order-123" returns "Order"
  • "User-abc-def" returns "User" (only splits on first hyphen)
  • "NoHyphen" returns "NoHyphen" (entire ID if no hyphen)
  • "" returns "" (empty string for empty input)

Types

type CheckpointAdapter

type CheckpointAdapter interface {
	// GetCheckpoint returns the last processed position for a projection.
	// Returns 0 if no checkpoint exists.
	GetCheckpoint(ctx context.Context, projectionName string) (uint64, error)

	// SetCheckpoint stores the last processed position for a projection.
	SetCheckpoint(ctx context.Context, projectionName string, position uint64) error
}

CheckpointAdapter manages projection checkpoints.

type ConcurrencyError

type ConcurrencyError struct {
	StreamID        string
	ExpectedVersion int64
	ActualVersion   int64
}

ConcurrencyError provides details about a concurrency conflict. It is returned when an optimistic concurrency check fails during Append operations.

func NewConcurrencyError

func NewConcurrencyError(streamID string, expected, actual int64) *ConcurrencyError

NewConcurrencyError creates a new ConcurrencyError.

func (*ConcurrencyError) Error

func (e *ConcurrencyError) Error() string

Error implements the error interface.

func (*ConcurrencyError) Is

func (e *ConcurrencyError) Is(target error) bool

Is implements errors.Is compatibility. Returns true when compared with ErrConcurrencyConflict.

type DiagnosticAdapter

type DiagnosticAdapter interface {
	// Ping checks if the database connection is healthy.
	Ping(ctx context.Context) error

	// GetDiagnosticInfo returns database version and connection status.
	GetDiagnosticInfo(ctx context.Context) (*DiagnosticInfo, error)

	// CheckSchema verifies the event store schema exists.
	CheckSchema(ctx context.Context, tableName string) (*SchemaCheckResult, error)

	// GetProjectionHealth returns projection health status.
	GetProjectionHealth(ctx context.Context) (*ProjectionHealthResult, error)
}

DiagnosticAdapter provides diagnostic capabilities for CLI tools.

type DiagnosticInfo

type DiagnosticInfo struct {
	// Version is the database server version (e.g., "PostgreSQL 16.1").
	Version string

	// Connected indicates if the connection is healthy.
	Connected bool

	// Message provides additional status information.
	Message string
}

DiagnosticInfo contains database diagnostic information.

type EventRecord

type EventRecord struct {
	// Type is the event type identifier.
	Type string

	// Data is the serialized event payload.
	Data []byte

	// Metadata contains optional contextual information.
	Metadata Metadata
}

EventRecord represents an event to be appended to a stream. This is the adapter-level representation of an event.

type EventStoreAdapter

type EventStoreAdapter interface {
	// Append stores events to the specified stream with optimistic concurrency control.
	// expectedVersion specifies the expected current version of the stream:
	//   - AnyVersion (-1): Skip version check
	//   - NoStream (0): Stream must not exist
	//   - StreamExists (-2): Stream must exist
	//   - Any positive number: Stream must be at this exact version
	// Returns the stored events with their assigned positions, or an error.
	Append(ctx context.Context, streamID string, events []EventRecord, expectedVersion int64) ([]StoredEvent, error)

	// Load retrieves all events from a stream starting from the specified version.
	// Use fromVersion=0 to load all events.
	Load(ctx context.Context, streamID string, fromVersion int64) ([]StoredEvent, error)

	// GetStreamInfo returns metadata about a stream.
	// Returns ErrStreamNotFound if the stream does not exist.
	GetStreamInfo(ctx context.Context, streamID string) (*StreamInfo, error)

	// GetLastPosition returns the global position of the last stored event.
	// Returns 0 if no events exist.
	GetLastPosition(ctx context.Context) (uint64, error)

	// Initialize sets up the required database schema.
	// This should be called once during application startup.
	Initialize(ctx context.Context) error

	// Close releases any resources held by the adapter.
	Close() error
}

EventStoreAdapter is the interface that database adapters must implement. It provides the low-level operations for persisting and retrieving events.

type EventStoreStats

type EventStoreStats struct {
	// TotalEvents is the total number of events across all streams.
	TotalEvents int64

	// TotalStreams is the number of unique streams.
	TotalStreams int64

	// EventTypes is the number of unique event types.
	EventTypes int64

	// AvgEventsPerStream is the average events per stream.
	AvgEventsPerStream float64

	// TopEventTypes contains the most common event types.
	TopEventTypes []EventTypeCount
}

EventStoreStats contains aggregate statistics about the event store.

type EventTypeCount

type EventTypeCount struct {
	// Type is the event type name.
	Type string

	// Count is the number of occurrences.
	Count int64
}

EventTypeCount holds an event type and its count.

type HealthChecker

type HealthChecker interface {
	// Ping checks if the adapter can connect to its backend.
	Ping(ctx context.Context) error
}

HealthChecker provides health check capabilities.

type IdempotencyRecord

type IdempotencyRecord struct {
	// Key is the idempotency key.
	Key string `json:"key"`

	// CommandType is the type of the processed command.
	CommandType string `json:"commandType"`

	// AggregateID is the ID of the affected aggregate (if any).
	AggregateID string `json:"aggregateId,omitempty"`

	// Version is the aggregate version after processing (if any).
	Version int64 `json:"version,omitempty"`

	// Response contains serialized response data (optional).
	Response []byte `json:"response,omitempty"`

	// Error contains the error message if the command failed.
	Error string `json:"error,omitempty"`

	// Success indicates if the command was processed successfully.
	Success bool `json:"success"`

	// ProcessedAt is when the command was processed.
	ProcessedAt time.Time `json:"processedAt"`

	// ExpiresAt is when the record should expire.
	ExpiresAt time.Time `json:"expiresAt"`
}

IdempotencyRecord stores information about a processed command.

func CopyIdempotencyRecord

func CopyIdempotencyRecord(record *IdempotencyRecord) *IdempotencyRecord

CopyIdempotencyRecord creates a deep copy of an IdempotencyRecord. This is useful to avoid external mutations of stored records.

func (*IdempotencyRecord) IsExpired

func (r *IdempotencyRecord) IsExpired() bool

IsExpired returns true if the record has expired.

type IdempotencyStore

type IdempotencyStore interface {
	// Exists checks if a command with the given key was already processed.
	Exists(ctx context.Context, key string) (bool, error)

	// Store records that a command was processed.
	Store(ctx context.Context, record *IdempotencyRecord) error

	// Get retrieves the idempotency record for a key.
	// Returns nil, nil if the key doesn't exist.
	Get(ctx context.Context, key string) (*IdempotencyRecord, error)

	// Delete removes an idempotency record.
	Delete(ctx context.Context, key string) error

	// Cleanup removes expired records.
	Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)
}

IdempotencyStore tracks processed commands to prevent duplicate processing. Adapters may implement this to support command idempotency.

type Metadata

type Metadata struct {
	// CorrelationID links related events across services.
	CorrelationID string `json:"correlationId,omitempty"`

	// CausationID identifies the event that caused this event.
	CausationID string `json:"causationId,omitempty"`

	// UserID identifies who triggered this event.
	UserID string `json:"userId,omitempty"`

	// TenantID for multi-tenant applications.
	TenantID string `json:"tenantId,omitempty"`

	// Custom holds any additional metadata.
	Custom map[string]string `json:"custom,omitempty"`
}

Metadata contains event context for tracing and multi-tenancy. These fields are preserved across serialization and can be used for correlation, audit trails, and multi-tenant isolation.

type MigrationAdapter

type MigrationAdapter interface {
	// GetAppliedMigrations returns the list of applied migration names.
	GetAppliedMigrations(ctx context.Context) ([]string, error)

	// RecordMigration marks a migration as applied.
	RecordMigration(ctx context.Context, name string) error

	// RemoveMigrationRecord removes a migration record (for rollback).
	RemoveMigrationRecord(ctx context.Context, name string) error

	// ExecuteSQL runs arbitrary SQL (for applying migrations).
	ExecuteSQL(ctx context.Context, sql string) error
}

MigrationAdapter provides migration management capabilities for CLI tools.

type MigrationInfo

type MigrationInfo struct {
	// Name is the migration identifier.
	Name string

	// AppliedAt is when the migration was applied (zero if pending).
	AppliedAt time.Time

	// Applied indicates if this migration has been run.
	Applied bool
}

MigrationInfo contains information about a database migration.

type Migrator

type Migrator interface {
	// Migrate runs pending database migrations.
	Migrate(ctx context.Context) error

	// MigrationVersion returns the current migration version.
	MigrationVersion(ctx context.Context) (int, error)
}

Migrator provides schema migration capabilities.

type OutboxAppender

type OutboxAppender interface {
	AppendWithOutbox(ctx context.Context, streamID string, events []EventRecord, expectedVersion int64, outboxMessages []*OutboxMessage) ([]StoredEvent, error)
}

OutboxAppender is an optional interface for adapters that support atomic event append + outbox schedule in a single transaction.

type OutboxMessage

type OutboxMessage struct {
	// ID is the unique message identifier.
	ID string `json:"id"`

	// AggregateID identifies the aggregate that produced this message.
	AggregateID string `json:"aggregateId"`

	// EventType is the type of event this message carries.
	EventType string `json:"eventType"`

	// Destination specifies where to publish (e.g., "webhook:https://...", "kafka:topic").
	Destination string `json:"destination"`

	// Payload is the serialized message content.
	Payload []byte `json:"payload"`

	// Headers contains additional key-value metadata for the message.
	Headers map[string]string `json:"headers,omitempty"`

	// Status is the current processing status.
	Status OutboxStatus `json:"status"`

	// Attempts is the number of delivery attempts made.
	Attempts int `json:"attempts"`

	// MaxAttempts is the maximum number of delivery attempts allowed.
	MaxAttempts int `json:"maxAttempts"`

	// LastError contains the error from the most recent failed attempt.
	LastError string `json:"lastError,omitempty"`

	// ScheduledAt is when the message was scheduled for delivery.
	ScheduledAt time.Time `json:"scheduledAt"`

	// LastAttemptAt is when the last delivery attempt was made.
	LastAttemptAt *time.Time `json:"lastAttemptAt,omitempty"`

	// ProcessedAt is when the message was successfully delivered.
	ProcessedAt *time.Time `json:"processedAt,omitempty"`

	// CreatedAt is when the message was created.
	CreatedAt time.Time `json:"createdAt"`
}

OutboxMessage represents a message in the transactional outbox.

type OutboxStatus

type OutboxStatus int

OutboxStatus represents the current status of an outbox message.

const (
	// OutboxPending indicates the message is waiting to be processed.
	OutboxPending OutboxStatus = iota

	// OutboxProcessing indicates the message is currently being processed.
	OutboxProcessing

	// OutboxCompleted indicates the message was successfully published.
	OutboxCompleted

	// OutboxFailed indicates the message failed to publish.
	OutboxFailed

	// OutboxDeadLetter indicates the message exceeded max retry attempts.
	OutboxDeadLetter
)

func (OutboxStatus) String

func (s OutboxStatus) String() string

String returns the string representation of the outbox status.

type OutboxStore

type OutboxStore interface {
	// Schedule stores outbox messages for later processing.
	Schedule(ctx context.Context, messages []*OutboxMessage) error

	// ScheduleInTx stores outbox messages within an existing database transaction.
	// The tx parameter is adapter-specific (e.g., *sql.Tx for PostgreSQL).
	// This enables atomic event+outbox writes.
	ScheduleInTx(ctx context.Context, tx interface{}, messages []*OutboxMessage) error

	// FetchPending atomically claims up to limit pending messages for processing.
	// Claimed messages are transitioned to OutboxProcessing status.
	FetchPending(ctx context.Context, limit int) ([]*OutboxMessage, error)

	// MarkCompleted marks messages as successfully delivered.
	MarkCompleted(ctx context.Context, ids []string) error

	// MarkFailed marks a message as failed with an error description.
	MarkFailed(ctx context.Context, id string, lastErr error) error

	// RetryFailed resets eligible failed messages (below maxAttempts) to pending.
	RetryFailed(ctx context.Context, maxAttempts int) (int64, error)

	// MoveToDeadLetter transitions messages that exceeded maxAttempts to dead letter.
	MoveToDeadLetter(ctx context.Context, maxAttempts int) (int64, error)

	// GetDeadLetterMessages retrieves dead-lettered messages.
	GetDeadLetterMessages(ctx context.Context, limit int) ([]*OutboxMessage, error)

	// Cleanup removes old completed messages.
	Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)

	// Initialize sets up the required storage schema.
	Initialize(ctx context.Context) error

	// Close releases any resources held by the store.
	Close() error
}

OutboxStore defines the interface for outbox message persistence.

type ProjectionHealthResult

type ProjectionHealthResult struct {
	// TotalProjections is the number of registered projections.
	TotalProjections int64

	// ProjectionsBehind is the number of projections that are behind.
	ProjectionsBehind int64

	// MaxPosition is the highest global position in the event store.
	MaxPosition int64

	// Message provides additional information.
	Message string
}

ProjectionHealthResult contains projection health information.

type ProjectionInfo

type ProjectionInfo struct {
	// Name is the projection identifier.
	Name string

	// Position is the last processed global position.
	Position int64

	// Status is the projection state (active, paused, etc.).
	Status string

	// UpdatedAt is when the projection was last updated.
	UpdatedAt time.Time
}

ProjectionInfo contains projection status information.

type ProjectionQueryAdapter

type ProjectionQueryAdapter interface {
	// ListProjections returns all registered projections.
	ListProjections(ctx context.Context) ([]ProjectionInfo, error)

	// GetProjection returns information about a specific projection.
	// Returns nil, nil if the projection doesn't exist.
	GetProjection(ctx context.Context, name string) (*ProjectionInfo, error)

	// SetProjectionStatus updates a projection's status (active, paused).
	SetProjectionStatus(ctx context.Context, name string, status string) error

	// ResetProjectionCheckpoint resets a projection's position to 0 for rebuild.
	ResetProjectionCheckpoint(ctx context.Context, name string) error

	// GetTotalEventCount returns the highest global position (for progress display).
	GetTotalEventCount(ctx context.Context) (int64, error)
}

ProjectionQueryAdapter provides projection management capabilities for CLI tools.

type SagaNotFoundError

type SagaNotFoundError struct {
	SagaID        string
	CorrelationID string
}

SagaNotFoundError provides detailed information about a missing saga.

func (*SagaNotFoundError) Error

func (e *SagaNotFoundError) Error() string

Error returns the error message.

func (*SagaNotFoundError) Is

func (e *SagaNotFoundError) Is(target error) bool

Is reports whether this error matches the target error.

func (*SagaNotFoundError) Unwrap

func (e *SagaNotFoundError) Unwrap() error

Unwrap returns the underlying error for errors.Unwrap().

type SagaState

type SagaState struct {
	// ID is the unique saga identifier.
	ID string `json:"id"`

	// Type is the saga type.
	Type string `json:"type"`

	// CorrelationID links this saga to related events/commands.
	CorrelationID string `json:"correlationId,omitempty"`

	// Status is the current saga status.
	Status SagaStatus `json:"status"`

	// CurrentStep is the current step number.
	CurrentStep int `json:"currentStep"`

	// Data contains the saga's internal state.
	Data map[string]interface{} `json:"data,omitempty"`

	// ProcessedEvents contains event keys that have been processed by this saga.
	// This is used internally by the SagaManager for idempotency tracking and
	// should not be modified directly by saga implementations.
	// Each entry is formatted as "eventID:globalPosition".
	ProcessedEvents []string `json:"processedEvents,omitempty"`

	// Steps contains the history of executed steps.
	Steps []SagaStep `json:"steps,omitempty"`

	// StartedAt is when the saga started.
	StartedAt time.Time `json:"startedAt"`

	// UpdatedAt is when the saga was last updated.
	UpdatedAt time.Time `json:"updatedAt"`

	// CompletedAt is when the saga completed (nil if not completed).
	CompletedAt *time.Time `json:"completedAt,omitempty"`

	// FailureReason contains the error message if the saga failed.
	FailureReason string `json:"failureReason,omitempty"`

	// Version for optimistic concurrency control.
	// Version 0 indicates a new saga that has not been saved yet.
	// After each successful save, the version is incremented by 1.
	// When updating an existing saga, the version must match the current
	// stored version; otherwise, ErrConcurrencyConflict is returned.
	Version int64 `json:"version"`
}

SagaState represents the persisted state of a saga. This is the data structure stored by the SagaStore.

func (*SagaState) IsTerminal

func (s *SagaState) IsTerminal() bool

IsTerminal returns true if the saga is in a terminal state.

type SagaStatus

type SagaStatus int

SagaStatus represents the current status of a saga.

const (
	// SagaStatusStarted indicates the saga has started but not completed.
	SagaStatusStarted SagaStatus = iota

	// SagaStatusRunning indicates the saga is actively processing.
	SagaStatusRunning

	// SagaStatusCompleted indicates the saga completed successfully.
	SagaStatusCompleted

	// SagaStatusFailed indicates the saga failed without compensation.
	SagaStatusFailed

	// SagaStatusCompensating indicates the saga is executing compensating actions.
	SagaStatusCompensating

	// SagaStatusCompensated indicates the saga has been compensated after failure.
	SagaStatusCompensated

	// SagaStatusCompensationFailed indicates compensation failed (partial rollback).
	SagaStatusCompensationFailed
)

func (SagaStatus) IsTerminal

func (s SagaStatus) IsTerminal() bool

IsTerminal returns true if the saga status is a terminal state.

func (SagaStatus) String

func (s SagaStatus) String() string

String returns the string representation of the saga status.

type SagaStep

type SagaStep struct {
	// Name is the human-readable name of the step.
	Name string `json:"name"`

	// Index is the step number (0-based).
	Index int `json:"index"`

	// Status is the current status of this step.
	Status SagaStepStatus `json:"status"`

	// Command is the command that was executed for this step.
	Command string `json:"command,omitempty"`

	// CompletedAt is when this step completed.
	CompletedAt *time.Time `json:"completedAt,omitempty"`

	// Error contains any error message if the step failed.
	Error string `json:"error,omitempty"`
}

SagaStep represents a single step in a saga.

type SagaStepStatus

type SagaStepStatus int

SagaStepStatus represents the status of a saga step.

const (
	// SagaStepPending indicates the step has not started.
	SagaStepPending SagaStepStatus = iota

	// SagaStepRunning indicates the step is in progress.
	SagaStepRunning

	// SagaStepCompleted indicates the step completed successfully.
	SagaStepCompleted

	// SagaStepFailed indicates the step failed.
	SagaStepFailed

	// SagaStepCompensated indicates the step was compensated.
	SagaStepCompensated
)

func (SagaStepStatus) String

func (s SagaStepStatus) String() string

String returns the string representation of the step status.

type SagaStore

type SagaStore interface {
	// Save persists a saga state with optimistic concurrency control.
	//
	// Version semantics:
	//   - Version 0: Creates a new saga. Returns error if saga already exists
	//     (for some implementations).
	//   - Version > 0: Updates an existing saga. The version must match the
	//     current stored version, otherwise ErrConcurrencyConflict is returned.
	//
	// After a successful save, state.Version is incremented to reflect the
	// new version stored in the database.
	Save(ctx context.Context, state *SagaState) error

	// Load retrieves a saga state by ID.
	// Returns ErrSagaNotFound if the saga doesn't exist.
	Load(ctx context.Context, sagaID string) (*SagaState, error)

	// FindByCorrelationID finds a saga by its correlation ID.
	// Returns ErrSagaNotFound if no saga is found.
	FindByCorrelationID(ctx context.Context, correlationID string) (*SagaState, error)

	// FindByType finds all sagas of a given type with the specified status.
	// If statuses is empty, returns sagas of all statuses.
	FindByType(ctx context.Context, sagaType string, statuses ...SagaStatus) ([]*SagaState, error)

	// Delete removes a saga state.
	// Returns ErrSagaNotFound if the saga doesn't exist.
	Delete(ctx context.Context, sagaID string) error

	// Close releases any resources held by the store.
	Close() error
}

SagaStore defines the interface for saga persistence.

type SchemaCheckResult

type SchemaCheckResult struct {
	// TableExists indicates if the events table exists.
	TableExists bool

	// EventCount is the number of events in the store.
	EventCount int64

	// Message provides additional information.
	Message string
}

SchemaCheckResult contains information about the event store schema.

type SchemaProvider

type SchemaProvider interface {
	// GenerateSchema returns the DDL for the event store schema.
	// tableName is the events table name.
	// snapshotTableName is the snapshots table name.
	// outboxTableName is the outbox table name.
	GenerateSchema(projectName, tableName, snapshotTableName, outboxTableName string) string
}

SchemaProvider generates database-specific schema SQL.

type SnapshotAdapter

type SnapshotAdapter interface {
	// SaveSnapshot stores a snapshot for the given stream.
	SaveSnapshot(ctx context.Context, streamID string, version int64, data []byte) error

	// LoadSnapshot retrieves the latest snapshot for the given stream.
	// Returns nil, nil if no snapshot exists.
	LoadSnapshot(ctx context.Context, streamID string) (*SnapshotRecord, error)

	// DeleteSnapshot removes the snapshot for the given stream.
	DeleteSnapshot(ctx context.Context, streamID string) error
}

SnapshotAdapter stores aggregate snapshots for faster loading.

type SnapshotRecord

type SnapshotRecord struct {
	// StreamID is the stream identifier.
	StreamID string

	// Version is the aggregate version at the time of the snapshot.
	Version int64

	// Data is the serialized snapshot payload.
	Data []byte
}

SnapshotRecord represents a stored aggregate snapshot.

type StoredEvent

type StoredEvent struct {
	// ID is the unique event identifier.
	ID string

	// StreamID is the stream this event belongs to.
	StreamID string

	// Type is the event type identifier.
	Type string

	// Data is the serialized event payload.
	Data []byte

	// Metadata contains contextual information.
	Metadata Metadata

	// Version is the position within the stream (1-based).
	Version int64

	// GlobalPosition is the global ordering position across all streams.
	GlobalPosition uint64

	// Timestamp is when the event was stored.
	Timestamp time.Time
}

StoredEvent represents a persisted event with its storage metadata. This is returned when loading events from the store.

type StreamInfo

type StreamInfo struct {
	// StreamID is the stream identifier.
	StreamID string

	// Category is the aggregate type (first part of stream ID).
	Category string

	// Version is the current stream version.
	Version int64

	// EventCount is the number of events in the stream.
	EventCount int64

	// CreatedAt is when the first event was stored.
	CreatedAt time.Time

	// UpdatedAt is when the last event was stored.
	UpdatedAt time.Time
}

StreamInfo contains metadata about an event stream.

type StreamNotFoundError

type StreamNotFoundError struct {
	StreamID string
}

StreamNotFoundError provides details about a missing stream. It is returned when an operation requires an existing stream that doesn't exist.

func NewStreamNotFoundError

func NewStreamNotFoundError(streamID string) *StreamNotFoundError

NewStreamNotFoundError creates a new StreamNotFoundError.

func (*StreamNotFoundError) Error

func (e *StreamNotFoundError) Error() string

Error implements the error interface.

func (*StreamNotFoundError) Is

func (e *StreamNotFoundError) Is(target error) bool

Is implements errors.Is compatibility. Returns true when compared with ErrStreamNotFound.

type StreamQueryAdapter

type StreamQueryAdapter interface {
	// ListStreams returns a list of stream summaries.
	// prefix filters streams by ID prefix (empty string for all).
	// limit caps the number of results (0 for unlimited).
	ListStreams(ctx context.Context, prefix string, limit int) ([]StreamSummary, error)

	// GetStreamEvents returns events from a stream with pagination.
	// fromVersion starts at this version (0 for beginning).
	// limit caps the number of events returned.
	GetStreamEvents(ctx context.Context, streamID string, fromVersion int64, limit int) ([]StoredEvent, error)

	// GetEventStoreStats returns aggregate statistics about the event store.
	GetEventStoreStats(ctx context.Context) (*EventStoreStats, error)
}

StreamQueryAdapter provides stream inspection capabilities for CLI tools. This allows querying streams without direct SQL access.

type StreamSummary

type StreamSummary struct {
	// StreamID is the stream identifier.
	StreamID string

	// EventCount is the number of events in the stream.
	EventCount int64

	// LastEventType is the type of the most recent event.
	LastEventType string

	// LastUpdated is when the last event was stored.
	LastUpdated time.Time
}

StreamSummary contains summary information about a stream for listing.

type SubscriptionAdapter

type SubscriptionAdapter interface {
	// LoadFromPosition loads events starting from a global position.
	// This is used by projection engines to catch up on historical events.
	LoadFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]StoredEvent, error)

	// SubscribeAll subscribes to all events across all streams.
	// Events are delivered starting from the specified global position.
	// Optional SubscriptionOptions can be provided to configure behavior.
	SubscribeAll(ctx context.Context, fromPosition uint64, opts ...SubscriptionOptions) (<-chan StoredEvent, error)

	// SubscribeStream subscribes to events from a specific stream.
	// Events are delivered starting from the specified version.
	// Optional SubscriptionOptions can be provided to configure behavior.
	SubscribeStream(ctx context.Context, streamID string, fromVersion int64, opts ...SubscriptionOptions) (<-chan StoredEvent, error)

	// SubscribeCategory subscribes to all events from streams in a category.
	// Events are delivered starting from the specified global position.
	// Optional SubscriptionOptions can be provided to configure behavior.
	SubscribeCategory(ctx context.Context, category string, fromPosition uint64, opts ...SubscriptionOptions) (<-chan StoredEvent, error)
}

SubscriptionAdapter provides event subscription capabilities. Adapters may optionally implement this interface for real-time event streaming.

type SubscriptionOptions

type SubscriptionOptions struct {
	// BufferSize is the size of the event channel buffer.
	// Default: 100
	BufferSize int

	// PollInterval is how often to poll for new events (for polling-based adapters).
	// Default: 100ms
	PollInterval time.Duration

	// OnError is called when an error occurs during subscription.
	// If nil, errors may be logged or silently retried depending on the adapter.
	OnError func(err error)
}

SubscriptionOptions configures subscription behavior. Adapters may support additional options beyond these common ones.

type Transaction

type Transaction interface {
	// Commit commits the transaction.
	Commit() error

	// Rollback aborts the transaction.
	Rollback() error

	// Adapter returns an adapter that operates within this transaction.
	Adapter() EventStoreAdapter
}

Transaction represents a database transaction.

type TransactionalAdapter

type TransactionalAdapter interface {
	// BeginTx starts a new transaction.
	BeginTx(ctx context.Context) (Transaction, error)
}

TransactionalAdapter provides transaction support. Adapters may optionally implement this for atomic operations.

Directories

Path Synopsis
Package memory provides an in-memory implementation of the event store adapter.
Package memory provides an in-memory implementation of the event store adapter.
Package postgres provides a PostgreSQL implementation of the event store adapter.
Package postgres provides a PostgreSQL implementation of the event store adapter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL