Documentation
¶
Overview ¶
Package memory provides an in-memory implementation of the event store adapter. This adapter is primarily intended for testing and development purposes.
Index ¶
- Constants
- Variables
- type Checkpoint
- type CheckpointStore
- func (s *CheckpointStore) Clear()
- func (s *CheckpointStore) DeleteCheckpoint(_ context.Context, projectionName string) error
- func (s *CheckpointStore) GetAllCheckpoints(ctx context.Context) (map[string]uint64, error)
- func (s *CheckpointStore) GetCheckpoint(_ context.Context, projectionName string) (uint64, error)
- func (s *CheckpointStore) GetCheckpointWithTimestamp(ctx context.Context, projectionName string) (*Checkpoint, error)
- func (s *CheckpointStore) Len() int
- func (s *CheckpointStore) SetCheckpoint(_ context.Context, projectionName string, position uint64) error
- type ConcurrencyError
- type IdempotencyStore
- func (s *IdempotencyStore) Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)
- func (s *IdempotencyStore) Clear()
- func (s *IdempotencyStore) Close() error
- func (s *IdempotencyStore) Delete(ctx context.Context, key string) error
- func (s *IdempotencyStore) Exists(ctx context.Context, key string) (bool, error)
- func (s *IdempotencyStore) Get(ctx context.Context, key string) (*adapters.IdempotencyRecord, error)
- func (s *IdempotencyStore) Len() int
- func (s *IdempotencyStore) Store(ctx context.Context, record *adapters.IdempotencyRecord) error
- type IdempotencyStoreOption
- type MemoryAdapter
- func (a *MemoryAdapter) Append(ctx context.Context, streamID string, events []adapters.EventRecord, ...) ([]adapters.StoredEvent, error)
- func (a *MemoryAdapter) CheckSchema(ctx context.Context, tableName string) (*adapters.SchemaCheckResult, error)
- func (a *MemoryAdapter) Close() error
- func (a *MemoryAdapter) DeleteSnapshot(ctx context.Context, streamID string) error
- func (a *MemoryAdapter) EventCount() int
- func (a *MemoryAdapter) ExecuteSQL(ctx context.Context, sql string) error
- func (a *MemoryAdapter) GenerateSchema(projectName, tableName, snapshotTableName, outboxTableName string) string
- func (a *MemoryAdapter) GetAppliedMigrations(ctx context.Context) ([]string, error)
- func (a *MemoryAdapter) GetCheckpoint(ctx context.Context, projectionName string) (uint64, error)
- func (a *MemoryAdapter) GetDiagnosticInfo(ctx context.Context) (*adapters.DiagnosticInfo, error)
- func (a *MemoryAdapter) GetEventStoreStats(ctx context.Context) (*adapters.EventStoreStats, error)
- func (a *MemoryAdapter) GetLastPosition(ctx context.Context) (uint64, error)
- func (a *MemoryAdapter) GetProjection(ctx context.Context, name string) (*adapters.ProjectionInfo, error)
- func (a *MemoryAdapter) GetProjectionHealth(ctx context.Context) (*adapters.ProjectionHealthResult, error)
- func (a *MemoryAdapter) GetStreamEvents(ctx context.Context, streamID string, fromVersion int64, limit int) ([]adapters.StoredEvent, error)
- func (a *MemoryAdapter) GetStreamInfo(ctx context.Context, streamID string) (*adapters.StreamInfo, error)
- func (a *MemoryAdapter) GetTotalEventCount(ctx context.Context) (int64, error)
- func (a *MemoryAdapter) Initialize(ctx context.Context) error
- func (a *MemoryAdapter) ListProjections(ctx context.Context) ([]adapters.ProjectionInfo, error)
- func (a *MemoryAdapter) ListStreams(ctx context.Context, prefix string, limit int) ([]adapters.StreamSummary, error)
- func (a *MemoryAdapter) Load(ctx context.Context, streamID string, fromVersion int64) ([]adapters.StoredEvent, error)
- func (a *MemoryAdapter) LoadFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]adapters.StoredEvent, error)
- func (a *MemoryAdapter) LoadSnapshot(ctx context.Context, streamID string) (*adapters.SnapshotRecord, error)
- func (a *MemoryAdapter) Ping(ctx context.Context) error
- func (a *MemoryAdapter) RecordMigration(ctx context.Context, name string) error
- func (a *MemoryAdapter) RemoveMigrationRecord(ctx context.Context, name string) error
- func (a *MemoryAdapter) Reset()
- func (a *MemoryAdapter) ResetProjectionCheckpoint(ctx context.Context, name string) error
- func (a *MemoryAdapter) SaveSnapshot(ctx context.Context, streamID string, version int64, data []byte) error
- func (a *MemoryAdapter) SetCheckpoint(ctx context.Context, projectionName string, position uint64) error
- func (a *MemoryAdapter) SetProjectionStatus(ctx context.Context, name string, status string) error
- func (a *MemoryAdapter) StreamCount() int
- func (a *MemoryAdapter) SubscribeAll(ctx context.Context, fromPosition uint64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)
- func (a *MemoryAdapter) SubscribeCategory(ctx context.Context, category string, fromPosition uint64, ...) (<-chan adapters.StoredEvent, error)
- func (a *MemoryAdapter) SubscribeStream(ctx context.Context, streamID string, fromVersion int64, ...) (<-chan adapters.StoredEvent, error)
- type Option
- type OutboxStore
- func (s *OutboxStore) Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)
- func (s *OutboxStore) Clear()
- func (s *OutboxStore) Close() error
- func (s *OutboxStore) Count() int
- func (s *OutboxStore) CountByStatus() map[adapters.OutboxStatus]int
- func (s *OutboxStore) FetchPending(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)
- func (s *OutboxStore) GetDeadLetterMessages(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)
- func (s *OutboxStore) Initialize(ctx context.Context) error
- func (s *OutboxStore) MarkCompleted(ctx context.Context, ids []string) error
- func (s *OutboxStore) MarkFailed(ctx context.Context, id string, lastErr error) error
- func (s *OutboxStore) MoveToDeadLetter(ctx context.Context, maxAttempts int) (int64, error)
- func (s *OutboxStore) RetryFailed(ctx context.Context, maxAttempts int) (int64, error)
- func (s *OutboxStore) Schedule(ctx context.Context, messages []*adapters.OutboxMessage) error
- func (s *OutboxStore) ScheduleInTx(ctx context.Context, _ interface{}, messages []*adapters.OutboxMessage) error
- type SagaStore
- func (s *SagaStore) All() []*adapters.SagaState
- func (s *SagaStore) Clear()
- func (s *SagaStore) Close() error
- func (s *SagaStore) Count() int
- func (s *SagaStore) CountByStatus(ctx context.Context) (map[adapters.SagaStatus]int64, error)
- func (s *SagaStore) Delete(ctx context.Context, sagaID string) error
- func (s *SagaStore) FindByCorrelationID(ctx context.Context, correlationID string) (*adapters.SagaState, error)
- func (s *SagaStore) FindByType(ctx context.Context, sagaType string, statuses ...adapters.SagaStatus) ([]*adapters.SagaState, error)
- func (s *SagaStore) Load(ctx context.Context, sagaID string) (*adapters.SagaState, error)
- func (s *SagaStore) Save(ctx context.Context, state *adapters.SagaState) error
- type StreamNotFoundError
Constants ¶
const ( AnyVersion = adapters.AnyVersion NoStream = adapters.NoStream StreamExists = adapters.StreamExists )
Version constants for optimistic concurrency control. These are re-exported from the adapters package for convenience.
Variables ¶
var ( // ErrAdapterClosed is returned when an operation is attempted on a closed adapter. ErrAdapterClosed = adapters.ErrAdapterClosed // ErrEmptyStreamID is returned when an empty stream ID is provided. ErrEmptyStreamID = adapters.ErrEmptyStreamID // ErrNoEvents is returned when attempting to append zero events. ErrNoEvents = adapters.ErrNoEvents // ErrConcurrencyConflict is returned when optimistic concurrency check fails. ErrConcurrencyConflict = adapters.ErrConcurrencyConflict // ErrStreamNotFound is returned when a stream does not exist. ErrStreamNotFound = adapters.ErrStreamNotFound // ErrInvalidVersion is returned when an invalid version is specified. ErrInvalidVersion = adapters.ErrInvalidVersion )
Sentinel errors for the memory adapter. These are aliases to the adapters package errors for compatibility with errors.Is().
var NewConcurrencyError = adapters.NewConcurrencyError
NewConcurrencyError is an alias for adapters.NewConcurrencyError for backward compatibility.
var NewStreamNotFoundError = adapters.NewStreamNotFoundError
NewStreamNotFoundError is an alias for adapters.NewStreamNotFoundError for backward compatibility.
Functions ¶
This section is empty.
Types ¶
type Checkpoint ¶
Checkpoint represents a stored checkpoint.
type CheckpointStore ¶
type CheckpointStore struct {
// contains filtered or unexported fields
}
CheckpointStore provides an in-memory implementation of CheckpointAdapter.
func NewCheckpointStore ¶
func NewCheckpointStore() *CheckpointStore
NewCheckpointStore creates a new in-memory checkpoint store.
func (*CheckpointStore) DeleteCheckpoint ¶
func (s *CheckpointStore) DeleteCheckpoint(_ context.Context, projectionName string) error
DeleteCheckpoint removes a checkpoint for a projection.
func (*CheckpointStore) GetAllCheckpoints ¶
GetAllCheckpoints returns all stored checkpoints.
func (*CheckpointStore) GetCheckpoint ¶
GetCheckpoint retrieves the current position for a projection. Returns 0 if no checkpoint exists.
func (*CheckpointStore) GetCheckpointWithTimestamp ¶
func (s *CheckpointStore) GetCheckpointWithTimestamp(ctx context.Context, projectionName string) (*Checkpoint, error)
GetCheckpointWithTimestamp retrieves the checkpoint with its last update time.
func (*CheckpointStore) Len ¶
func (s *CheckpointStore) Len() int
Len returns the number of checkpoints.
func (*CheckpointStore) SetCheckpoint ¶
func (s *CheckpointStore) SetCheckpoint(_ context.Context, projectionName string, position uint64) error
SetCheckpoint stores the position for a projection.
type ConcurrencyError ¶
type ConcurrencyError = adapters.ConcurrencyError
ConcurrencyError is an alias for adapters.ConcurrencyError for backward compatibility.
type IdempotencyStore ¶
type IdempotencyStore struct {
// contains filtered or unexported fields
}
IdempotencyStore provides an in-memory implementation of adapters.IdempotencyStore. It is useful for testing and development but should not be used in production as it does not persist data across restarts.
func NewIdempotencyStore ¶
func NewIdempotencyStore(opts ...IdempotencyStoreOption) *IdempotencyStore
NewIdempotencyStore creates a new in-memory IdempotencyStore.
func (*IdempotencyStore) Cleanup ¶
Cleanup removes records older than the specified duration. Returns the number of records deleted.
func (*IdempotencyStore) Clear ¶
func (s *IdempotencyStore) Clear()
Clear removes all records from the store. Useful for testing.
func (*IdempotencyStore) Close ¶
func (s *IdempotencyStore) Close() error
Close stops the cleanup goroutine and releases resources. It is safe to call Close() multiple times.
func (*IdempotencyStore) Delete ¶
func (s *IdempotencyStore) Delete(ctx context.Context, key string) error
Delete removes an idempotency record by key.
func (*IdempotencyStore) Exists ¶
Exists checks if a record with the given key exists and is not expired.
func (*IdempotencyStore) Get ¶
func (s *IdempotencyStore) Get(ctx context.Context, key string) (*adapters.IdempotencyRecord, error)
Get retrieves an idempotency record by key. Returns nil if the record doesn't exist or is expired.
func (*IdempotencyStore) Len ¶
func (s *IdempotencyStore) Len() int
Len returns the number of records in the store. Useful for testing.
func (*IdempotencyStore) Store ¶
func (s *IdempotencyStore) Store(ctx context.Context, record *adapters.IdempotencyRecord) error
Store saves a new idempotency record.
type IdempotencyStoreOption ¶
type IdempotencyStoreOption func(*IdempotencyStore)
IdempotencyStoreOption configures an IdempotencyStore
func WithCleanupInterval ¶
func WithCleanupInterval(interval time.Duration) IdempotencyStoreOption
WithCleanupInterval sets the interval for automatic cleanup. Set to 0 to disable automatic cleanup.
func WithMaxAge ¶
func WithMaxAge(maxAge time.Duration) IdempotencyStoreOption
WithMaxAge sets the maximum age for records. Records older than this will be cleaned up.
type MemoryAdapter ¶
type MemoryAdapter struct {
// contains filtered or unexported fields
}
MemoryAdapter is an in-memory implementation of EventStoreAdapter. It is thread-safe and suitable for unit testing.
func NewAdapter ¶
func NewAdapter(opts ...Option) *MemoryAdapter
NewAdapter creates a new in-memory event store adapter.
func (*MemoryAdapter) Append ¶
func (a *MemoryAdapter) Append(ctx context.Context, streamID string, events []adapters.EventRecord, expectedVersion int64) ([]adapters.StoredEvent, error)
Append stores events to the specified stream with optimistic concurrency control.
func (*MemoryAdapter) CheckSchema ¶
func (a *MemoryAdapter) CheckSchema(ctx context.Context, tableName string) (*adapters.SchemaCheckResult, error)
CheckSchema verifies the event store "schema" (always exists for memory).
func (*MemoryAdapter) Close ¶
func (a *MemoryAdapter) Close() error
Close releases any resources held by the adapter.
func (*MemoryAdapter) DeleteSnapshot ¶
func (a *MemoryAdapter) DeleteSnapshot(ctx context.Context, streamID string) error
DeleteSnapshot removes the snapshot for the given stream.
func (*MemoryAdapter) EventCount ¶
func (a *MemoryAdapter) EventCount() int
EventCount returns the total number of events stored.
func (*MemoryAdapter) ExecuteSQL ¶
func (a *MemoryAdapter) ExecuteSQL(ctx context.Context, sql string) error
ExecuteSQL is a no-op for the memory adapter (no SQL to execute).
func (*MemoryAdapter) GenerateSchema ¶
func (a *MemoryAdapter) GenerateSchema(projectName, tableName, snapshotTableName, outboxTableName string) string
GenerateSchema returns an informational message for the memory adapter.
func (*MemoryAdapter) GetAppliedMigrations ¶
func (a *MemoryAdapter) GetAppliedMigrations(ctx context.Context) ([]string, error)
GetAppliedMigrations returns the list of applied migration names.
func (*MemoryAdapter) GetCheckpoint ¶
GetCheckpoint returns the last processed position for a projection.
func (*MemoryAdapter) GetDiagnosticInfo ¶
func (a *MemoryAdapter) GetDiagnosticInfo(ctx context.Context) (*adapters.DiagnosticInfo, error)
GetDiagnosticInfo returns diagnostic information for the memory adapter.
func (*MemoryAdapter) GetEventStoreStats ¶
func (a *MemoryAdapter) GetEventStoreStats(ctx context.Context) (*adapters.EventStoreStats, error)
GetEventStoreStats returns aggregate statistics about the event store.
func (*MemoryAdapter) GetLastPosition ¶
func (a *MemoryAdapter) GetLastPosition(ctx context.Context) (uint64, error)
GetLastPosition returns the global position of the last stored event.
func (*MemoryAdapter) GetProjection ¶
func (a *MemoryAdapter) GetProjection(ctx context.Context, name string) (*adapters.ProjectionInfo, error)
GetProjection returns information about a specific projection.
func (*MemoryAdapter) GetProjectionHealth ¶
func (a *MemoryAdapter) GetProjectionHealth(ctx context.Context) (*adapters.ProjectionHealthResult, error)
GetProjectionHealth returns projection health status for memory adapter.
func (*MemoryAdapter) GetStreamEvents ¶
func (a *MemoryAdapter) GetStreamEvents(ctx context.Context, streamID string, fromVersion int64, limit int) ([]adapters.StoredEvent, error)
GetStreamEvents returns events from a stream with pagination for CLI display.
func (*MemoryAdapter) GetStreamInfo ¶
func (a *MemoryAdapter) GetStreamInfo(ctx context.Context, streamID string) (*adapters.StreamInfo, error)
GetStreamInfo returns metadata about a stream.
func (*MemoryAdapter) GetTotalEventCount ¶
func (a *MemoryAdapter) GetTotalEventCount(ctx context.Context) (int64, error)
GetTotalEventCount returns the highest global position.
func (*MemoryAdapter) Initialize ¶
func (a *MemoryAdapter) Initialize(ctx context.Context) error
Initialize is a no-op for the memory adapter.
func (*MemoryAdapter) ListProjections ¶
func (a *MemoryAdapter) ListProjections(ctx context.Context) ([]adapters.ProjectionInfo, error)
ListProjections returns all registered projections.
func (*MemoryAdapter) ListStreams ¶
func (a *MemoryAdapter) ListStreams(ctx context.Context, prefix string, limit int) ([]adapters.StreamSummary, error)
ListStreams returns a list of stream summaries for CLI display.
func (*MemoryAdapter) Load ¶
func (a *MemoryAdapter) Load(ctx context.Context, streamID string, fromVersion int64) ([]adapters.StoredEvent, error)
Load retrieves all events from a stream starting from the specified version.
func (*MemoryAdapter) LoadFromPosition ¶
func (a *MemoryAdapter) LoadFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]adapters.StoredEvent, error)
LoadFromPosition loads events starting from a global position. This is used by projection engines to catch up on historical events.
func (*MemoryAdapter) LoadSnapshot ¶
func (a *MemoryAdapter) LoadSnapshot(ctx context.Context, streamID string) (*adapters.SnapshotRecord, error)
LoadSnapshot retrieves the latest snapshot for the given stream.
func (*MemoryAdapter) Ping ¶
func (a *MemoryAdapter) Ping(ctx context.Context) error
Ping checks if the adapter is healthy.
func (*MemoryAdapter) RecordMigration ¶
func (a *MemoryAdapter) RecordMigration(ctx context.Context, name string) error
RecordMigration marks a migration as applied.
func (*MemoryAdapter) RemoveMigrationRecord ¶
func (a *MemoryAdapter) RemoveMigrationRecord(ctx context.Context, name string) error
RemoveMigrationRecord removes a migration record (for rollback).
func (*MemoryAdapter) Reset ¶
func (a *MemoryAdapter) Reset()
Reset clears all data. Useful for testing.
func (*MemoryAdapter) ResetProjectionCheckpoint ¶
func (a *MemoryAdapter) ResetProjectionCheckpoint(ctx context.Context, name string) error
ResetProjectionCheckpoint resets a projection's position to 0 for rebuild.
func (*MemoryAdapter) SaveSnapshot ¶
func (a *MemoryAdapter) SaveSnapshot(ctx context.Context, streamID string, version int64, data []byte) error
SaveSnapshot stores a snapshot for the given stream.
func (*MemoryAdapter) SetCheckpoint ¶
func (a *MemoryAdapter) SetCheckpoint(ctx context.Context, projectionName string, position uint64) error
SetCheckpoint stores the last processed position for a projection.
func (*MemoryAdapter) SetProjectionStatus ¶
SetProjectionStatus updates a projection's status.
func (*MemoryAdapter) StreamCount ¶
func (a *MemoryAdapter) StreamCount() int
StreamCount returns the number of streams.
func (*MemoryAdapter) SubscribeAll ¶
func (a *MemoryAdapter) SubscribeAll(ctx context.Context, fromPosition uint64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)
SubscribeAll subscribes to all events across all streams.
func (*MemoryAdapter) SubscribeCategory ¶
func (a *MemoryAdapter) SubscribeCategory(ctx context.Context, category string, fromPosition uint64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)
SubscribeCategory subscribes to all events from streams in a category.
func (*MemoryAdapter) SubscribeStream ¶
func (a *MemoryAdapter) SubscribeStream(ctx context.Context, streamID string, fromVersion int64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)
SubscribeStream subscribes to events from a specific stream.
type OutboxStore ¶
type OutboxStore struct {
// contains filtered or unexported fields
}
OutboxStore provides an in-memory implementation of adapters.OutboxStore. This is primarily intended for testing and development purposes. Note: ScheduleInTx ignores the tx parameter since in-memory storage has no real transactions.
func NewOutboxStore ¶
func NewOutboxStore() *OutboxStore
NewOutboxStore creates a new in-memory OutboxStore.
func (*OutboxStore) Clear ¶
func (s *OutboxStore) Clear()
Clear removes all messages (useful for testing).
func (*OutboxStore) Close ¶
func (s *OutboxStore) Close() error
Close is a no-op for the in-memory store.
func (*OutboxStore) Count ¶
func (s *OutboxStore) Count() int
Count returns the total number of messages stored.
func (*OutboxStore) CountByStatus ¶
func (s *OutboxStore) CountByStatus() map[adapters.OutboxStatus]int
CountByStatus returns the count of messages by status.
func (*OutboxStore) FetchPending ¶
func (s *OutboxStore) FetchPending(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)
FetchPending atomically claims up to limit pending messages for processing.
func (*OutboxStore) GetDeadLetterMessages ¶
func (s *OutboxStore) GetDeadLetterMessages(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)
GetDeadLetterMessages retrieves dead-lettered messages.
func (*OutboxStore) Initialize ¶
func (s *OutboxStore) Initialize(ctx context.Context) error
Initialize is a no-op for the in-memory store.
func (*OutboxStore) MarkCompleted ¶
func (s *OutboxStore) MarkCompleted(ctx context.Context, ids []string) error
MarkCompleted marks messages as successfully delivered.
func (*OutboxStore) MarkFailed ¶
MarkFailed marks a message as failed with an error description.
func (*OutboxStore) MoveToDeadLetter ¶
MoveToDeadLetter transitions messages that exceeded per-message max_attempts or global maxAttempts to dead letter.
func (*OutboxStore) RetryFailed ¶
RetryFailed resets eligible failed messages (below per-message max_attempts or global maxAttempts) to pending.
func (*OutboxStore) Schedule ¶
func (s *OutboxStore) Schedule(ctx context.Context, messages []*adapters.OutboxMessage) error
Schedule stores outbox messages for later processing.
func (*OutboxStore) ScheduleInTx ¶
func (s *OutboxStore) ScheduleInTx(ctx context.Context, _ interface{}, messages []*adapters.OutboxMessage) error
ScheduleInTx stores outbox messages (tx parameter is ignored for in-memory store).
type SagaStore ¶
type SagaStore struct {
// contains filtered or unexported fields
}
SagaStore provides an in-memory implementation of adapters.SagaStore. This is primarily intended for testing and development purposes.
func (*SagaStore) CountByStatus ¶
CountByStatus returns the count of sagas by status.
func (*SagaStore) FindByCorrelationID ¶
func (s *SagaStore) FindByCorrelationID(ctx context.Context, correlationID string) (*adapters.SagaState, error)
FindByCorrelationID finds a saga by its correlation ID.
func (*SagaStore) FindByType ¶
func (s *SagaStore) FindByType(ctx context.Context, sagaType string, statuses ...adapters.SagaStatus) ([]*adapters.SagaState, error)
FindByType finds all sagas of a given type with the specified statuses.
func (*SagaStore) Save ¶
Save persists a saga state with optimistic concurrency control.
Version semantics:
- Version 0: Creates a new saga. If a saga with this ID already exists, it returns a ConcurrencyError.
- Version > 0: Updates an existing saga. If no saga exists with this ID, it returns SagaNotFoundError. If the version doesn't match, it returns ConcurrencyError.
After a successful save, state.Version is incremented to reflect the new version.
type StreamNotFoundError ¶
type StreamNotFoundError = adapters.StreamNotFoundError
StreamNotFoundError is an alias for adapters.StreamNotFoundError for backward compatibility.