Documentation
¶
Overview ¶
Package adapters provides interfaces for event store backends.
Package adapters provides interfaces and shared utilities for event store backends.
Index ¶
- Constants
- Variables
- func CheckVersion(streamID string, expected, current int64, exists bool) error
- func DefaultLimit(limit, defaultValue int) int
- func ExtractCategory(streamID string) string
- type CheckpointAdapter
- type ConcurrencyError
- type DiagnosticAdapter
- type DiagnosticInfo
- type EventRecord
- type EventStoreAdapter
- type EventStoreStats
- type EventTypeCount
- type HealthChecker
- type IdempotencyRecord
- type IdempotencyStore
- type Metadata
- type MigrationAdapter
- type MigrationInfo
- type Migrator
- type OutboxAppender
- type OutboxMessage
- type OutboxStatus
- type OutboxStore
- type ProjectionHealthResult
- type ProjectionInfo
- type ProjectionQueryAdapter
- type SagaNotFoundError
- type SagaState
- type SagaStatus
- type SagaStep
- type SagaStepStatus
- type SagaStore
- type SchemaCheckResult
- type SchemaProvider
- type SnapshotAdapter
- type SnapshotRecord
- type StoredEvent
- type StreamInfo
- type StreamNotFoundError
- type StreamQueryAdapter
- type StreamSummary
- type SubscriptionAdapter
- type SubscriptionOptions
- type Transaction
- type TransactionalAdapter
Constants ¶
const ( // AnyVersion skips version checking. Use when you don't care about concurrent modifications. AnyVersion int64 = -1 // NoStream requires the stream to not exist. Use for creating new streams. NoStream int64 = 0 // StreamExists requires the stream to exist. Use when you expect to append to an existing stream. StreamExists int64 = -2 )
Version constants for optimistic concurrency control. These constants define special version values used in Append operations.
Variables ¶
var ( // ErrConcurrencyConflict is returned when optimistic concurrency check fails. ErrConcurrencyConflict = errors.New("mink: concurrency conflict") // ErrStreamNotFound is returned when a stream does not exist. ErrStreamNotFound = errors.New("mink: stream not found") // ErrEmptyStreamID is returned when an empty stream ID is provided. ErrEmptyStreamID = errors.New("mink: stream ID is required") // ErrNoEvents is returned when attempting to append zero events. ErrNoEvents = errors.New("mink: no events to append") // ErrInvalidVersion is returned when an invalid version is specified. ErrInvalidVersion = errors.New("mink: invalid version") // ErrAdapterClosed is returned when operations are attempted on a closed adapter. ErrAdapterClosed = errors.New("mink: adapter is closed") // ErrSagaNotFound indicates the requested saga does not exist. ErrSagaNotFound = errors.New("mink: saga not found") // ErrSagaAlreadyExists indicates a saga with the same ID already exists. ErrSagaAlreadyExists = errors.New("mink: saga already exists") // ErrNilAggregate indicates a nil aggregate was passed. ErrNilAggregate = errors.New("mink: nil aggregate") // ErrOutboxMessageNotFound indicates the requested outbox message does not exist. ErrOutboxMessageNotFound = errors.New("mink: outbox message not found") )
Sentinel errors for adapter implementations. Adapters should return these (or errors that match via errors.Is) to enable consistent error handling across different backends.
Functions ¶
func CheckVersion ¶
CheckVersion validates the expected version against the current version. This implements the optimistic concurrency control logic shared by all adapters.
Parameters:
- streamID: The stream identifier (used for error messages)
- expected: The expected version (can be AnyVersion, NoStream, StreamExists, or a positive version)
- current: The current version of the stream
- exists: Whether the stream currently exists
Returns nil if the version check passes, or an appropriate error otherwise.
func DefaultLimit ¶
DefaultLimit returns a default limit value if the provided limit is invalid. Used for pagination in LoadFromPosition and similar methods.
func ExtractCategory ¶
ExtractCategory extracts the category from a stream ID. Stream IDs are expected to follow the format "Category-ID" (e.g., "Order-123"). The category is the portion before the first hyphen.
Behavior:
- "Order-123" returns "Order"
- "User-abc-def" returns "User" (only splits on first hyphen)
- "NoHyphen" returns "NoHyphen" (entire ID if no hyphen)
- "" returns "" (empty string for empty input)
Types ¶
type CheckpointAdapter ¶
type CheckpointAdapter interface {
// GetCheckpoint returns the last processed position for a projection.
// Returns 0 if no checkpoint exists.
GetCheckpoint(ctx context.Context, projectionName string) (uint64, error)
// SetCheckpoint stores the last processed position for a projection.
SetCheckpoint(ctx context.Context, projectionName string, position uint64) error
}
CheckpointAdapter manages projection checkpoints.
type ConcurrencyError ¶
ConcurrencyError provides details about a concurrency conflict. It is returned when an optimistic concurrency check fails during Append operations.
func NewConcurrencyError ¶
func NewConcurrencyError(streamID string, expected, actual int64) *ConcurrencyError
NewConcurrencyError creates a new ConcurrencyError.
func (*ConcurrencyError) Error ¶
func (e *ConcurrencyError) Error() string
Error implements the error interface.
func (*ConcurrencyError) Is ¶
func (e *ConcurrencyError) Is(target error) bool
Is implements errors.Is compatibility. Returns true when compared with ErrConcurrencyConflict.
type DiagnosticAdapter ¶
type DiagnosticAdapter interface {
// Ping checks if the database connection is healthy.
Ping(ctx context.Context) error
// GetDiagnosticInfo returns database version and connection status.
GetDiagnosticInfo(ctx context.Context) (*DiagnosticInfo, error)
// CheckSchema verifies the event store schema exists.
CheckSchema(ctx context.Context, tableName string) (*SchemaCheckResult, error)
// GetProjectionHealth returns projection health status.
GetProjectionHealth(ctx context.Context) (*ProjectionHealthResult, error)
}
DiagnosticAdapter provides diagnostic capabilities for CLI tools.
type DiagnosticInfo ¶
type DiagnosticInfo struct {
// Version is the database server version (e.g., "PostgreSQL 16.1").
Version string
// Connected indicates if the connection is healthy.
Connected bool
// Message provides additional status information.
Message string
}
DiagnosticInfo contains database diagnostic information.
type EventRecord ¶
type EventRecord struct {
// Type is the event type identifier.
Type string
// Data is the serialized event payload.
Data []byte
// Metadata contains optional contextual information.
Metadata Metadata
}
EventRecord represents an event to be appended to a stream. This is the adapter-level representation of an event.
type EventStoreAdapter ¶
type EventStoreAdapter interface {
// Append stores events to the specified stream with optimistic concurrency control.
// expectedVersion specifies the expected current version of the stream:
// - AnyVersion (-1): Skip version check
// - NoStream (0): Stream must not exist
// - StreamExists (-2): Stream must exist
// - Any positive number: Stream must be at this exact version
// Returns the stored events with their assigned positions, or an error.
Append(ctx context.Context, streamID string, events []EventRecord, expectedVersion int64) ([]StoredEvent, error)
// Load retrieves all events from a stream starting from the specified version.
// Use fromVersion=0 to load all events.
Load(ctx context.Context, streamID string, fromVersion int64) ([]StoredEvent, error)
// GetStreamInfo returns metadata about a stream.
// Returns ErrStreamNotFound if the stream does not exist.
GetStreamInfo(ctx context.Context, streamID string) (*StreamInfo, error)
// GetLastPosition returns the global position of the last stored event.
// Returns 0 if no events exist.
GetLastPosition(ctx context.Context) (uint64, error)
// Initialize sets up the required database schema.
// This should be called once during application startup.
Initialize(ctx context.Context) error
// Close releases any resources held by the adapter.
Close() error
}
EventStoreAdapter is the interface that database adapters must implement. It provides the low-level operations for persisting and retrieving events.
type EventStoreStats ¶
type EventStoreStats struct {
// TotalEvents is the total number of events across all streams.
TotalEvents int64
// TotalStreams is the number of unique streams.
TotalStreams int64
// EventTypes is the number of unique event types.
EventTypes int64
// AvgEventsPerStream is the average events per stream.
AvgEventsPerStream float64
// TopEventTypes contains the most common event types.
TopEventTypes []EventTypeCount
}
EventStoreStats contains aggregate statistics about the event store.
type EventTypeCount ¶
type EventTypeCount struct {
// Type is the event type name.
Type string
// Count is the number of occurrences.
Count int64
}
EventTypeCount holds an event type and its count.
type HealthChecker ¶
type HealthChecker interface {
// Ping checks if the adapter can connect to its backend.
Ping(ctx context.Context) error
}
HealthChecker provides health check capabilities.
type IdempotencyRecord ¶
type IdempotencyRecord struct {
// Key is the idempotency key.
Key string `json:"key"`
// CommandType is the type of the processed command.
CommandType string `json:"commandType"`
// AggregateID is the ID of the affected aggregate (if any).
AggregateID string `json:"aggregateId,omitempty"`
// Version is the aggregate version after processing (if any).
Version int64 `json:"version,omitempty"`
// Response contains serialized response data (optional).
Response []byte `json:"response,omitempty"`
// Error contains the error message if the command failed.
Error string `json:"error,omitempty"`
// Success indicates if the command was processed successfully.
Success bool `json:"success"`
// ProcessedAt is when the command was processed.
ProcessedAt time.Time `json:"processedAt"`
// ExpiresAt is when the record should expire.
ExpiresAt time.Time `json:"expiresAt"`
}
IdempotencyRecord stores information about a processed command.
func CopyIdempotencyRecord ¶
func CopyIdempotencyRecord(record *IdempotencyRecord) *IdempotencyRecord
CopyIdempotencyRecord creates a deep copy of an IdempotencyRecord. This is useful to avoid external mutations of stored records.
func (*IdempotencyRecord) IsExpired ¶
func (r *IdempotencyRecord) IsExpired() bool
IsExpired returns true if the record has expired.
type IdempotencyStore ¶
type IdempotencyStore interface {
// Exists checks if a command with the given key was already processed.
Exists(ctx context.Context, key string) (bool, error)
// Store records that a command was processed.
Store(ctx context.Context, record *IdempotencyRecord) error
// Get retrieves the idempotency record for a key.
// Returns nil, nil if the key doesn't exist.
Get(ctx context.Context, key string) (*IdempotencyRecord, error)
// Delete removes an idempotency record.
Delete(ctx context.Context, key string) error
// Cleanup removes expired records.
Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)
}
IdempotencyStore tracks processed commands to prevent duplicate processing. Adapters may implement this to support command idempotency.
type Metadata ¶
type Metadata struct {
// CorrelationID links related events across services.
CorrelationID string `json:"correlationId,omitempty"`
// CausationID identifies the event that caused this event.
CausationID string `json:"causationId,omitempty"`
// UserID identifies who triggered this event.
UserID string `json:"userId,omitempty"`
// TenantID for multi-tenant applications.
TenantID string `json:"tenantId,omitempty"`
// Custom holds any additional metadata.
Custom map[string]string `json:"custom,omitempty"`
}
Metadata contains event context for tracing and multi-tenancy. These fields are preserved across serialization and can be used for correlation, audit trails, and multi-tenant isolation.
type MigrationAdapter ¶
type MigrationAdapter interface {
// GetAppliedMigrations returns the list of applied migration names.
GetAppliedMigrations(ctx context.Context) ([]string, error)
// RecordMigration marks a migration as applied.
RecordMigration(ctx context.Context, name string) error
// RemoveMigrationRecord removes a migration record (for rollback).
RemoveMigrationRecord(ctx context.Context, name string) error
// ExecuteSQL runs arbitrary SQL (for applying migrations).
ExecuteSQL(ctx context.Context, sql string) error
}
MigrationAdapter provides migration management capabilities for CLI tools.
type MigrationInfo ¶
type MigrationInfo struct {
// Name is the migration identifier.
Name string
// AppliedAt is when the migration was applied (zero if pending).
AppliedAt time.Time
// Applied indicates if this migration has been run.
Applied bool
}
MigrationInfo contains information about a database migration.
type Migrator ¶
type Migrator interface {
// Migrate runs pending database migrations.
Migrate(ctx context.Context) error
// MigrationVersion returns the current migration version.
MigrationVersion(ctx context.Context) (int, error)
}
Migrator provides schema migration capabilities.
type OutboxAppender ¶
type OutboxAppender interface {
AppendWithOutbox(ctx context.Context, streamID string, events []EventRecord, expectedVersion int64, outboxMessages []*OutboxMessage) ([]StoredEvent, error)
}
OutboxAppender is an optional interface for adapters that support atomic event append + outbox schedule in a single transaction.
type OutboxMessage ¶
type OutboxMessage struct {
// ID is the unique message identifier.
ID string `json:"id"`
// AggregateID identifies the aggregate that produced this message.
AggregateID string `json:"aggregateId"`
// EventType is the type of event this message carries.
EventType string `json:"eventType"`
// Destination specifies where to publish (e.g., "webhook:https://...", "kafka:topic").
Destination string `json:"destination"`
// Payload is the serialized message content.
Payload []byte `json:"payload"`
// Headers contains additional key-value metadata for the message.
Headers map[string]string `json:"headers,omitempty"`
// Status is the current processing status.
Status OutboxStatus `json:"status"`
// Attempts is the number of delivery attempts made.
Attempts int `json:"attempts"`
// MaxAttempts is the maximum number of delivery attempts allowed.
MaxAttempts int `json:"maxAttempts"`
// LastError contains the error from the most recent failed attempt.
LastError string `json:"lastError,omitempty"`
// ScheduledAt is when the message was scheduled for delivery.
ScheduledAt time.Time `json:"scheduledAt"`
// LastAttemptAt is when the last delivery attempt was made.
LastAttemptAt *time.Time `json:"lastAttemptAt,omitempty"`
// ProcessedAt is when the message was successfully delivered.
ProcessedAt *time.Time `json:"processedAt,omitempty"`
// CreatedAt is when the message was created.
CreatedAt time.Time `json:"createdAt"`
}
OutboxMessage represents a message in the transactional outbox.
type OutboxStatus ¶
type OutboxStatus int
OutboxStatus represents the current status of an outbox message.
const ( // OutboxPending indicates the message is waiting to be processed. OutboxPending OutboxStatus = iota // OutboxProcessing indicates the message is currently being processed. OutboxProcessing // OutboxCompleted indicates the message was successfully published. OutboxCompleted // OutboxFailed indicates the message failed to publish. OutboxFailed // OutboxDeadLetter indicates the message exceeded max retry attempts. OutboxDeadLetter )
func (OutboxStatus) String ¶
func (s OutboxStatus) String() string
String returns the string representation of the outbox status.
type OutboxStore ¶
type OutboxStore interface {
// Schedule stores outbox messages for later processing.
Schedule(ctx context.Context, messages []*OutboxMessage) error
// ScheduleInTx stores outbox messages within an existing database transaction.
// The tx parameter is adapter-specific (e.g., *sql.Tx for PostgreSQL).
// This enables atomic event+outbox writes.
ScheduleInTx(ctx context.Context, tx interface{}, messages []*OutboxMessage) error
// FetchPending atomically claims up to limit pending messages for processing.
// Claimed messages are transitioned to OutboxProcessing status.
FetchPending(ctx context.Context, limit int) ([]*OutboxMessage, error)
// MarkCompleted marks messages as successfully delivered.
MarkCompleted(ctx context.Context, ids []string) error
// MarkFailed marks a message as failed with an error description.
MarkFailed(ctx context.Context, id string, lastErr error) error
// RetryFailed resets eligible failed messages (below maxAttempts) to pending.
RetryFailed(ctx context.Context, maxAttempts int) (int64, error)
// MoveToDeadLetter transitions messages that exceeded maxAttempts to dead letter.
MoveToDeadLetter(ctx context.Context, maxAttempts int) (int64, error)
// GetDeadLetterMessages retrieves dead-lettered messages.
GetDeadLetterMessages(ctx context.Context, limit int) ([]*OutboxMessage, error)
// Cleanup removes old completed messages.
Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)
// Initialize sets up the required storage schema.
Initialize(ctx context.Context) error
// Close releases any resources held by the store.
Close() error
}
OutboxStore defines the interface for outbox message persistence.
type ProjectionHealthResult ¶
type ProjectionHealthResult struct {
// TotalProjections is the number of registered projections.
TotalProjections int64
// ProjectionsBehind is the number of projections that are behind.
ProjectionsBehind int64
// MaxPosition is the highest global position in the event store.
MaxPosition int64
// Message provides additional information.
Message string
}
ProjectionHealthResult contains projection health information.
type ProjectionInfo ¶
type ProjectionInfo struct {
// Name is the projection identifier.
Name string
// Position is the last processed global position.
Position int64
// Status is the projection state (active, paused, etc.).
Status string
// UpdatedAt is when the projection was last updated.
UpdatedAt time.Time
}
ProjectionInfo contains projection status information.
type ProjectionQueryAdapter ¶
type ProjectionQueryAdapter interface {
// ListProjections returns all registered projections.
ListProjections(ctx context.Context) ([]ProjectionInfo, error)
// GetProjection returns information about a specific projection.
// Returns nil, nil if the projection doesn't exist.
GetProjection(ctx context.Context, name string) (*ProjectionInfo, error)
// SetProjectionStatus updates a projection's status (active, paused).
SetProjectionStatus(ctx context.Context, name string, status string) error
// ResetProjectionCheckpoint resets a projection's position to 0 for rebuild.
ResetProjectionCheckpoint(ctx context.Context, name string) error
// GetTotalEventCount returns the highest global position (for progress display).
GetTotalEventCount(ctx context.Context) (int64, error)
}
ProjectionQueryAdapter provides projection management capabilities for CLI tools.
type SagaNotFoundError ¶
SagaNotFoundError provides detailed information about a missing saga.
func (*SagaNotFoundError) Error ¶
func (e *SagaNotFoundError) Error() string
Error returns the error message.
func (*SagaNotFoundError) Is ¶
func (e *SagaNotFoundError) Is(target error) bool
Is reports whether this error matches the target error.
func (*SagaNotFoundError) Unwrap ¶
func (e *SagaNotFoundError) Unwrap() error
Unwrap returns the underlying error for errors.Unwrap().
type SagaState ¶
type SagaState struct {
// ID is the unique saga identifier.
ID string `json:"id"`
// Type is the saga type.
Type string `json:"type"`
// CorrelationID links this saga to related events/commands.
CorrelationID string `json:"correlationId,omitempty"`
// Status is the current saga status.
Status SagaStatus `json:"status"`
// CurrentStep is the current step number.
CurrentStep int `json:"currentStep"`
// Data contains the saga's internal state.
Data map[string]interface{} `json:"data,omitempty"`
// ProcessedEvents contains event keys that have been processed by this saga.
// This is used internally by the SagaManager for idempotency tracking and
// should not be modified directly by saga implementations.
// Each entry is formatted as "eventID:globalPosition".
ProcessedEvents []string `json:"processedEvents,omitempty"`
// Steps contains the history of executed steps.
Steps []SagaStep `json:"steps,omitempty"`
// StartedAt is when the saga started.
StartedAt time.Time `json:"startedAt"`
// UpdatedAt is when the saga was last updated.
UpdatedAt time.Time `json:"updatedAt"`
// CompletedAt is when the saga completed (nil if not completed).
CompletedAt *time.Time `json:"completedAt,omitempty"`
// FailureReason contains the error message if the saga failed.
FailureReason string `json:"failureReason,omitempty"`
// Version for optimistic concurrency control.
// Version 0 indicates a new saga that has not been saved yet.
// After each successful save, the version is incremented by 1.
// When updating an existing saga, the version must match the current
// stored version; otherwise, ErrConcurrencyConflict is returned.
Version int64 `json:"version"`
}
SagaState represents the persisted state of a saga. This is the data structure stored by the SagaStore.
func (*SagaState) IsTerminal ¶
IsTerminal returns true if the saga is in a terminal state.
type SagaStatus ¶
type SagaStatus int
SagaStatus represents the current status of a saga.
const ( // SagaStatusStarted indicates the saga has started but not completed. SagaStatusStarted SagaStatus = iota // SagaStatusRunning indicates the saga is actively processing. SagaStatusRunning // SagaStatusCompleted indicates the saga completed successfully. SagaStatusCompleted // SagaStatusFailed indicates the saga failed without compensation. SagaStatusFailed // SagaStatusCompensating indicates the saga is executing compensating actions. SagaStatusCompensating // SagaStatusCompensated indicates the saga has been compensated after failure. SagaStatusCompensated // SagaStatusCompensationFailed indicates compensation failed (partial rollback). SagaStatusCompensationFailed )
func (SagaStatus) IsTerminal ¶
func (s SagaStatus) IsTerminal() bool
IsTerminal returns true if the saga status is a terminal state.
func (SagaStatus) String ¶
func (s SagaStatus) String() string
String returns the string representation of the saga status.
type SagaStep ¶
type SagaStep struct {
// Name is the human-readable name of the step.
Name string `json:"name"`
// Index is the step number (0-based).
Index int `json:"index"`
// Status is the current status of this step.
Status SagaStepStatus `json:"status"`
// Command is the command that was executed for this step.
Command string `json:"command,omitempty"`
// CompletedAt is when this step completed.
CompletedAt *time.Time `json:"completedAt,omitempty"`
// Error contains any error message if the step failed.
Error string `json:"error,omitempty"`
}
SagaStep represents a single step in a saga.
type SagaStepStatus ¶
type SagaStepStatus int
SagaStepStatus represents the status of a saga step.
const ( // SagaStepPending indicates the step has not started. SagaStepPending SagaStepStatus = iota // SagaStepRunning indicates the step is in progress. SagaStepRunning // SagaStepCompleted indicates the step completed successfully. SagaStepCompleted // SagaStepFailed indicates the step failed. SagaStepFailed // SagaStepCompensated indicates the step was compensated. SagaStepCompensated )
func (SagaStepStatus) String ¶
func (s SagaStepStatus) String() string
String returns the string representation of the step status.
type SagaStore ¶
type SagaStore interface {
// Save persists a saga state with optimistic concurrency control.
//
// Version semantics:
// - Version 0: Creates a new saga. Returns error if saga already exists
// (for some implementations).
// - Version > 0: Updates an existing saga. The version must match the
// current stored version, otherwise ErrConcurrencyConflict is returned.
//
// After a successful save, state.Version is incremented to reflect the
// new version stored in the database.
Save(ctx context.Context, state *SagaState) error
// Load retrieves a saga state by ID.
// Returns ErrSagaNotFound if the saga doesn't exist.
Load(ctx context.Context, sagaID string) (*SagaState, error)
// FindByCorrelationID finds a saga by its correlation ID.
// Returns ErrSagaNotFound if no saga is found.
FindByCorrelationID(ctx context.Context, correlationID string) (*SagaState, error)
// FindByType finds all sagas of a given type with the specified status.
// If statuses is empty, returns sagas of all statuses.
FindByType(ctx context.Context, sagaType string, statuses ...SagaStatus) ([]*SagaState, error)
// Delete removes a saga state.
// Returns ErrSagaNotFound if the saga doesn't exist.
Delete(ctx context.Context, sagaID string) error
// Close releases any resources held by the store.
Close() error
}
SagaStore defines the interface for saga persistence.
type SchemaCheckResult ¶
type SchemaCheckResult struct {
// TableExists indicates if the events table exists.
TableExists bool
// EventCount is the number of events in the store.
EventCount int64
// Message provides additional information.
Message string
}
SchemaCheckResult contains information about the event store schema.
type SchemaProvider ¶
type SchemaProvider interface {
// GenerateSchema returns the DDL for the event store schema.
// tableName is the events table name.
// snapshotTableName is the snapshots table name.
// outboxTableName is the outbox table name.
GenerateSchema(projectName, tableName, snapshotTableName, outboxTableName string) string
}
SchemaProvider generates database-specific schema SQL.
type SnapshotAdapter ¶
type SnapshotAdapter interface {
// SaveSnapshot stores a snapshot for the given stream.
SaveSnapshot(ctx context.Context, streamID string, version int64, data []byte) error
// LoadSnapshot retrieves the latest snapshot for the given stream.
// Returns nil, nil if no snapshot exists.
LoadSnapshot(ctx context.Context, streamID string) (*SnapshotRecord, error)
// DeleteSnapshot removes the snapshot for the given stream.
DeleteSnapshot(ctx context.Context, streamID string) error
}
SnapshotAdapter stores aggregate snapshots for faster loading.
type SnapshotRecord ¶
type SnapshotRecord struct {
// StreamID is the stream identifier.
StreamID string
// Version is the aggregate version at the time of the snapshot.
Version int64
// Data is the serialized snapshot payload.
Data []byte
}
SnapshotRecord represents a stored aggregate snapshot.
type StoredEvent ¶
type StoredEvent struct {
// ID is the unique event identifier.
ID string
// StreamID is the stream this event belongs to.
StreamID string
// Type is the event type identifier.
Type string
// Data is the serialized event payload.
Data []byte
// Metadata contains contextual information.
Metadata Metadata
// Version is the position within the stream (1-based).
Version int64
// GlobalPosition is the global ordering position across all streams.
GlobalPosition uint64
// Timestamp is when the event was stored.
Timestamp time.Time
}
StoredEvent represents a persisted event with its storage metadata. This is returned when loading events from the store.
type StreamInfo ¶
type StreamInfo struct {
// StreamID is the stream identifier.
StreamID string
// Category is the aggregate type (first part of stream ID).
Category string
// Version is the current stream version.
Version int64
// EventCount is the number of events in the stream.
EventCount int64
// CreatedAt is when the first event was stored.
CreatedAt time.Time
// UpdatedAt is when the last event was stored.
UpdatedAt time.Time
}
StreamInfo contains metadata about an event stream.
type StreamNotFoundError ¶
type StreamNotFoundError struct {
StreamID string
}
StreamNotFoundError provides details about a missing stream. It is returned when an operation requires an existing stream that doesn't exist.
func NewStreamNotFoundError ¶
func NewStreamNotFoundError(streamID string) *StreamNotFoundError
NewStreamNotFoundError creates a new StreamNotFoundError.
func (*StreamNotFoundError) Error ¶
func (e *StreamNotFoundError) Error() string
Error implements the error interface.
func (*StreamNotFoundError) Is ¶
func (e *StreamNotFoundError) Is(target error) bool
Is implements errors.Is compatibility. Returns true when compared with ErrStreamNotFound.
type StreamQueryAdapter ¶
type StreamQueryAdapter interface {
// ListStreams returns a list of stream summaries.
// prefix filters streams by ID prefix (empty string for all).
// limit caps the number of results (0 for unlimited).
ListStreams(ctx context.Context, prefix string, limit int) ([]StreamSummary, error)
// GetStreamEvents returns events from a stream with pagination.
// fromVersion starts at this version (0 for beginning).
// limit caps the number of events returned.
GetStreamEvents(ctx context.Context, streamID string, fromVersion int64, limit int) ([]StoredEvent, error)
// GetEventStoreStats returns aggregate statistics about the event store.
GetEventStoreStats(ctx context.Context) (*EventStoreStats, error)
}
StreamQueryAdapter provides stream inspection capabilities for CLI tools. This allows querying streams without direct SQL access.
type StreamSummary ¶
type StreamSummary struct {
// StreamID is the stream identifier.
StreamID string
// EventCount is the number of events in the stream.
EventCount int64
// LastEventType is the type of the most recent event.
LastEventType string
// LastUpdated is when the last event was stored.
LastUpdated time.Time
}
StreamSummary contains summary information about a stream for listing.
type SubscriptionAdapter ¶
type SubscriptionAdapter interface {
// LoadFromPosition loads events starting from a global position.
// This is used by projection engines to catch up on historical events.
LoadFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]StoredEvent, error)
// SubscribeAll subscribes to all events across all streams.
// Events are delivered starting from the specified global position.
// Optional SubscriptionOptions can be provided to configure behavior.
SubscribeAll(ctx context.Context, fromPosition uint64, opts ...SubscriptionOptions) (<-chan StoredEvent, error)
// SubscribeStream subscribes to events from a specific stream.
// Events are delivered starting from the specified version.
// Optional SubscriptionOptions can be provided to configure behavior.
SubscribeStream(ctx context.Context, streamID string, fromVersion int64, opts ...SubscriptionOptions) (<-chan StoredEvent, error)
// SubscribeCategory subscribes to all events from streams in a category.
// Events are delivered starting from the specified global position.
// Optional SubscriptionOptions can be provided to configure behavior.
SubscribeCategory(ctx context.Context, category string, fromPosition uint64, opts ...SubscriptionOptions) (<-chan StoredEvent, error)
}
SubscriptionAdapter provides event subscription capabilities. Adapters may optionally implement this interface for real-time event streaming.
type SubscriptionOptions ¶
type SubscriptionOptions struct {
// BufferSize is the size of the event channel buffer.
// Default: 100
BufferSize int
// PollInterval is how often to poll for new events (for polling-based adapters).
// Default: 100ms
PollInterval time.Duration
// OnError is called when an error occurs during subscription.
// If nil, errors may be logged or silently retried depending on the adapter.
OnError func(err error)
}
SubscriptionOptions configures subscription behavior. Adapters may support additional options beyond these common ones.
type Transaction ¶
type Transaction interface {
// Commit commits the transaction.
Commit() error
// Rollback aborts the transaction.
Rollback() error
// Adapter returns an adapter that operates within this transaction.
Adapter() EventStoreAdapter
}
Transaction represents a database transaction.
type TransactionalAdapter ¶
type TransactionalAdapter interface {
// BeginTx starts a new transaction.
BeginTx(ctx context.Context) (Transaction, error)
}
TransactionalAdapter provides transaction support. Adapters may optionally implement this for atomic operations.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package memory provides an in-memory implementation of the event store adapter.
|
Package memory provides an in-memory implementation of the event store adapter. |
|
Package postgres provides a PostgreSQL implementation of the event store adapter.
|
Package postgres provides a PostgreSQL implementation of the event store adapter. |