memory

package
v1.0.26 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package memory provides an in-memory implementation of the event store adapter. This adapter is primarily intended for testing and development purposes.

Index

Constants

View Source
const (
	AnyVersion   = adapters.AnyVersion
	NoStream     = adapters.NoStream
	StreamExists = adapters.StreamExists
)

Version constants for optimistic concurrency control. These are re-exported from the adapters package for convenience.

Variables

View Source
var (
	// ErrAdapterClosed is returned when an operation is attempted on a closed adapter.
	ErrAdapterClosed = adapters.ErrAdapterClosed

	// ErrEmptyStreamID is returned when an empty stream ID is provided.
	ErrEmptyStreamID = adapters.ErrEmptyStreamID

	// ErrNoEvents is returned when attempting to append zero events.
	ErrNoEvents = adapters.ErrNoEvents

	// ErrConcurrencyConflict is returned when optimistic concurrency check fails.
	ErrConcurrencyConflict = adapters.ErrConcurrencyConflict

	// ErrStreamNotFound is returned when a stream does not exist.
	ErrStreamNotFound = adapters.ErrStreamNotFound

	// ErrInvalidVersion is returned when an invalid version is specified.
	ErrInvalidVersion = adapters.ErrInvalidVersion
)

Sentinel errors for the memory adapter. These are aliases to the adapters package errors for compatibility with errors.Is().

View Source
var NewConcurrencyError = adapters.NewConcurrencyError

NewConcurrencyError is an alias for adapters.NewConcurrencyError for backward compatibility.

View Source
var NewStreamNotFoundError = adapters.NewStreamNotFoundError

NewStreamNotFoundError is an alias for adapters.NewStreamNotFoundError for backward compatibility.

Functions

This section is empty.

Types

type Checkpoint

type Checkpoint struct {
	ProjectionName string
	Position       uint64
	UpdatedAt      time.Time
}

Checkpoint represents a stored checkpoint.

type CheckpointStore

type CheckpointStore struct {
	// contains filtered or unexported fields
}

CheckpointStore provides an in-memory implementation of CheckpointAdapter.

func NewCheckpointStore

func NewCheckpointStore() *CheckpointStore

NewCheckpointStore creates a new in-memory checkpoint store.

func (*CheckpointStore) Clear

func (s *CheckpointStore) Clear()

Clear removes all checkpoints.

func (*CheckpointStore) DeleteCheckpoint

func (s *CheckpointStore) DeleteCheckpoint(_ context.Context, projectionName string) error

DeleteCheckpoint removes a checkpoint for a projection.

func (*CheckpointStore) GetAllCheckpoints

func (s *CheckpointStore) GetAllCheckpoints(ctx context.Context) (map[string]uint64, error)

GetAllCheckpoints returns all stored checkpoints.

func (*CheckpointStore) GetCheckpoint

func (s *CheckpointStore) GetCheckpoint(_ context.Context, projectionName string) (uint64, error)

GetCheckpoint retrieves the current position for a projection. Returns 0 if no checkpoint exists.

func (*CheckpointStore) GetCheckpointWithTimestamp

func (s *CheckpointStore) GetCheckpointWithTimestamp(ctx context.Context, projectionName string) (*Checkpoint, error)

GetCheckpointWithTimestamp retrieves the checkpoint with its last update time.

func (*CheckpointStore) Len

func (s *CheckpointStore) Len() int

Len returns the number of checkpoints.

func (*CheckpointStore) SetCheckpoint

func (s *CheckpointStore) SetCheckpoint(_ context.Context, projectionName string, position uint64) error

SetCheckpoint stores the position for a projection.

type ConcurrencyError

type ConcurrencyError = adapters.ConcurrencyError

ConcurrencyError is an alias for adapters.ConcurrencyError for backward compatibility.

type IdempotencyStore

type IdempotencyStore struct {
	// contains filtered or unexported fields
}

IdempotencyStore provides an in-memory implementation of adapters.IdempotencyStore. It is useful for testing and development but should not be used in production as it does not persist data across restarts.

func NewIdempotencyStore

func NewIdempotencyStore(opts ...IdempotencyStoreOption) *IdempotencyStore

NewIdempotencyStore creates a new in-memory IdempotencyStore.

func (*IdempotencyStore) Cleanup

func (s *IdempotencyStore) Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)

Cleanup removes records older than the specified duration. Returns the number of records deleted.

func (*IdempotencyStore) Clear

func (s *IdempotencyStore) Clear()

Clear removes all records from the store. Useful for testing.

func (*IdempotencyStore) Close

func (s *IdempotencyStore) Close() error

Close stops the cleanup goroutine and releases resources. It is safe to call Close() multiple times.

func (*IdempotencyStore) Delete

func (s *IdempotencyStore) Delete(ctx context.Context, key string) error

Delete removes an idempotency record by key.

func (*IdempotencyStore) Exists

func (s *IdempotencyStore) Exists(ctx context.Context, key string) (bool, error)

Exists checks if a record with the given key exists and is not expired.

func (*IdempotencyStore) Get

Get retrieves an idempotency record by key. Returns nil if the record doesn't exist or is expired.

func (*IdempotencyStore) Len

func (s *IdempotencyStore) Len() int

Len returns the number of records in the store. Useful for testing.

func (*IdempotencyStore) Store

Store saves a new idempotency record.

func (*IdempotencyStore) StoreIfAbsent added in v1.0.25

func (s *IdempotencyStore) StoreIfAbsent(ctx context.Context, record *adapters.IdempotencyRecord) (bool, error)

StoreIfAbsent atomically stores the record only if no live (non-expired) record exists for its key. An expired record is treated as absent and overwritten. Returns true if the record was stored.

type IdempotencyStoreOption

type IdempotencyStoreOption func(*IdempotencyStore)

IdempotencyStoreOption configures an IdempotencyStore

func WithCleanupInterval

func WithCleanupInterval(interval time.Duration) IdempotencyStoreOption

WithCleanupInterval sets the interval for automatic cleanup. Set to 0 to disable automatic cleanup.

func WithMaxAge

func WithMaxAge(maxAge time.Duration) IdempotencyStoreOption

WithMaxAge sets the maximum age for records. Records older than this will be cleaned up.

type MemoryAdapter

type MemoryAdapter struct {
	// contains filtered or unexported fields
}

MemoryAdapter is an in-memory implementation of EventStoreAdapter. It is thread-safe and suitable for unit testing.

func NewAdapter

func NewAdapter(opts ...Option) *MemoryAdapter

NewAdapter creates a new in-memory event store adapter.

func (*MemoryAdapter) Append

func (a *MemoryAdapter) Append(ctx context.Context, streamID string, events []adapters.EventRecord, expectedVersion int64) ([]adapters.StoredEvent, error)

Append stores events to the specified stream with optimistic concurrency control.

func (*MemoryAdapter) AppendWithOutbox added in v1.0.25

func (a *MemoryAdapter) AppendWithOutbox(ctx context.Context, streamID string, events []adapters.EventRecord, expectedVersion int64, outbox adapters.OutboxStore, outboxMessages []*adapters.OutboxMessage) ([]adapters.StoredEvent, error)

AppendWithOutbox atomically appends events and schedules outbox messages into the provided store, all under the adapter's write lock.

The append is validated and the messages scheduled BEFORE any events are written, so a version conflict or a scheduling failure leaves neither the events nor the messages — mirroring the single-transaction guarantee of the PostgreSQL adapter. Messages are scheduled into the caller-provided store (the one configured on EventStoreWithOutbox), keeping the atomic path consistent with the store the caller reads from.

Scheduling runs while a.mu is held, so the provided store must not call back into this adapter; the in-process memory OutboxStore does not.

func (*MemoryAdapter) CheckSchema

func (a *MemoryAdapter) CheckSchema(ctx context.Context, tableName string) (*adapters.SchemaCheckResult, error)

CheckSchema verifies the event store "schema" (always exists for memory).

func (*MemoryAdapter) Close

func (a *MemoryAdapter) Close() error

Close releases any resources held by the adapter.

func (*MemoryAdapter) DeleteSnapshot

func (a *MemoryAdapter) DeleteSnapshot(ctx context.Context, streamID string) error

DeleteSnapshot removes the snapshot for the given stream.

func (*MemoryAdapter) EventCount

func (a *MemoryAdapter) EventCount() int

EventCount returns the total number of events stored.

func (*MemoryAdapter) ExecuteSQL

func (a *MemoryAdapter) ExecuteSQL(ctx context.Context, sql string) error

ExecuteSQL is a no-op for the memory adapter (no SQL to execute).

func (*MemoryAdapter) GenerateSchema

func (a *MemoryAdapter) GenerateSchema(projectName, tableName, snapshotTableName, outboxTableName string) string

GenerateSchema returns an informational message for the memory adapter.

func (*MemoryAdapter) GetAppliedMigrations

func (a *MemoryAdapter) GetAppliedMigrations(ctx context.Context) ([]string, error)

GetAppliedMigrations returns the list of applied migration names.

func (*MemoryAdapter) GetCheckpoint

func (a *MemoryAdapter) GetCheckpoint(ctx context.Context, projectionName string) (uint64, error)

GetCheckpoint returns the last processed position for a projection.

func (*MemoryAdapter) GetDiagnosticInfo

func (a *MemoryAdapter) GetDiagnosticInfo(ctx context.Context) (*adapters.DiagnosticInfo, error)

GetDiagnosticInfo returns diagnostic information for the memory adapter.

func (*MemoryAdapter) GetEventStoreStats

func (a *MemoryAdapter) GetEventStoreStats(ctx context.Context) (*adapters.EventStoreStats, error)

GetEventStoreStats returns aggregate statistics about the event store.

func (*MemoryAdapter) GetLastPosition

func (a *MemoryAdapter) GetLastPosition(ctx context.Context) (uint64, error)

GetLastPosition returns the global position of the last stored event.

func (*MemoryAdapter) GetProjection

func (a *MemoryAdapter) GetProjection(ctx context.Context, name string) (*adapters.ProjectionInfo, error)

GetProjection returns information about a specific projection.

func (*MemoryAdapter) GetProjectionHealth

func (a *MemoryAdapter) GetProjectionHealth(ctx context.Context) (*adapters.ProjectionHealthResult, error)

GetProjectionHealth returns projection health status for memory adapter.

func (*MemoryAdapter) GetStreamEvents

func (a *MemoryAdapter) GetStreamEvents(ctx context.Context, streamID string, fromVersion int64, limit int) ([]adapters.StoredEvent, error)

GetStreamEvents returns events from a stream with pagination for CLI display.

func (*MemoryAdapter) GetStreamInfo

func (a *MemoryAdapter) GetStreamInfo(ctx context.Context, streamID string) (*adapters.StreamInfo, error)

GetStreamInfo returns metadata about a stream.

func (*MemoryAdapter) GetTotalEventCount

func (a *MemoryAdapter) GetTotalEventCount(ctx context.Context) (int64, error)

GetTotalEventCount returns the highest global position.

func (*MemoryAdapter) Initialize

func (a *MemoryAdapter) Initialize(ctx context.Context) error

Initialize is a no-op for the memory adapter.

func (*MemoryAdapter) ListProjections

func (a *MemoryAdapter) ListProjections(ctx context.Context) ([]adapters.ProjectionInfo, error)

ListProjections returns all registered projections.

func (*MemoryAdapter) ListStreams

func (a *MemoryAdapter) ListStreams(ctx context.Context, prefix string, limit int) ([]adapters.StreamSummary, error)

ListStreams returns a list of stream summaries for CLI display.

func (*MemoryAdapter) Load

func (a *MemoryAdapter) Load(ctx context.Context, streamID string, fromVersion int64) ([]adapters.StoredEvent, error)

Load retrieves all events from a stream starting from the specified version.

func (*MemoryAdapter) LoadFromPosition

func (a *MemoryAdapter) LoadFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]adapters.StoredEvent, error)

LoadFromPosition loads events starting from a global position. This is used by projection engines to catch up on historical events.

func (*MemoryAdapter) LoadSnapshot

func (a *MemoryAdapter) LoadSnapshot(ctx context.Context, streamID string) (*adapters.SnapshotRecord, error)

LoadSnapshot retrieves the latest snapshot for the given stream.

func (*MemoryAdapter) Migrate added in v1.0.25

func (a *MemoryAdapter) Migrate(ctx context.Context) error

Migrate runs pending schema migrations. The memory adapter requires no schema, so this is a no-op (it only honors context cancellation and closed state) and exists to satisfy the adapters.Migrator interface for parity with PostgreSQL.

func (*MemoryAdapter) MigrationVersion added in v1.0.25

func (a *MemoryAdapter) MigrationVersion(ctx context.Context) (int, error)

MigrationVersion returns the current migration version. The memory adapter has no evolving schema, so it reports a constant version of 1 whenever the adapter is usable. This satisfies the adapters.Migrator interface.

func (*MemoryAdapter) Ping

func (a *MemoryAdapter) Ping(ctx context.Context) error

Ping checks if the adapter is healthy.

func (*MemoryAdapter) RecordMigration

func (a *MemoryAdapter) RecordMigration(ctx context.Context, name string) error

RecordMigration marks a migration as applied.

func (*MemoryAdapter) RemoveMigrationRecord

func (a *MemoryAdapter) RemoveMigrationRecord(ctx context.Context, name string) error

RemoveMigrationRecord removes a migration record (for rollback).

func (*MemoryAdapter) Reset

func (a *MemoryAdapter) Reset()

Reset clears all data. Useful for testing.

func (*MemoryAdapter) ResetProjectionCheckpoint

func (a *MemoryAdapter) ResetProjectionCheckpoint(ctx context.Context, name string) error

ResetProjectionCheckpoint resets a projection's position to 0 for rebuild.

func (*MemoryAdapter) SaveSnapshot

func (a *MemoryAdapter) SaveSnapshot(ctx context.Context, streamID string, version int64, data []byte) error

SaveSnapshot stores a snapshot for the given stream.

func (*MemoryAdapter) SetCheckpoint

func (a *MemoryAdapter) SetCheckpoint(ctx context.Context, projectionName string, position uint64) error

SetCheckpoint stores the last processed position for a projection.

func (*MemoryAdapter) SetProjectionStatus

func (a *MemoryAdapter) SetProjectionStatus(ctx context.Context, name string, status string) error

SetProjectionStatus updates a projection's status.

func (*MemoryAdapter) StreamCount

func (a *MemoryAdapter) StreamCount() int

StreamCount returns the number of streams.

func (*MemoryAdapter) SubscribeAll

func (a *MemoryAdapter) SubscribeAll(ctx context.Context, fromPosition uint64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)

SubscribeAll subscribes to all events across all streams.

Live delivery is best-effort and lossy: each subscriber is served by a buffered channel (sized by SubscriptionOptions.BufferSize). Historical events are delivered synchronously while the subscription is being set up, but once the subscription is live, an event is dropped for any subscriber whose buffer is full at the moment of publication (the memory adapter never blocks the writer). When an event is dropped, the subscription's OnError callback (if provided) is invoked so the caller can detect the loss; consumers that must not miss events should drain the channel promptly, use a sufficiently large buffer, or rely on checkpoint-based catch-up via LoadFromPosition rather than purely on live delivery.

func (*MemoryAdapter) SubscribeCategory

func (a *MemoryAdapter) SubscribeCategory(ctx context.Context, category string, fromPosition uint64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)

SubscribeCategory subscribes to all events from streams in a category.

func (*MemoryAdapter) SubscribeStream

func (a *MemoryAdapter) SubscribeStream(ctx context.Context, streamID string, fromVersion int64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)

SubscribeStream subscribes to events from a specific stream.

type Option

type Option func(*MemoryAdapter)

Option configures a MemoryAdapter.

type OutboxStore

type OutboxStore struct {
	// contains filtered or unexported fields
}

OutboxStore provides an in-memory implementation of adapters.OutboxStore. This is primarily intended for testing and development purposes. Note: ScheduleInTx ignores the tx parameter since in-memory storage has no real transactions.

func NewOutboxStore

func NewOutboxStore() *OutboxStore

NewOutboxStore creates a new in-memory OutboxStore.

func (*OutboxStore) Cleanup

func (s *OutboxStore) Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)

Cleanup removes old completed messages.

func (*OutboxStore) Clear

func (s *OutboxStore) Clear()

Clear removes all messages (useful for testing).

func (*OutboxStore) Close

func (s *OutboxStore) Close() error

Close is a no-op for the in-memory store.

func (*OutboxStore) Count

func (s *OutboxStore) Count() int

Count returns the total number of messages stored.

func (*OutboxStore) CountByStatus

func (s *OutboxStore) CountByStatus() map[adapters.OutboxStatus]int

CountByStatus returns the count of messages by status.

func (*OutboxStore) FetchPending

func (s *OutboxStore) FetchPending(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)

FetchPending atomically claims up to limit pending messages for processing.

func (*OutboxStore) GetDeadLetterMessages

func (s *OutboxStore) GetDeadLetterMessages(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)

GetDeadLetterMessages retrieves dead-lettered messages.

func (*OutboxStore) Initialize

func (s *OutboxStore) Initialize(ctx context.Context) error

Initialize is a no-op for the in-memory store.

func (*OutboxStore) MarkCompleted

func (s *OutboxStore) MarkCompleted(ctx context.Context, ids []string) error

MarkCompleted marks messages as successfully delivered.

func (*OutboxStore) MarkFailed

func (s *OutboxStore) MarkFailed(ctx context.Context, id string, lastErr error) error

MarkFailed marks a message as failed with an error description.

func (*OutboxStore) MoveToDeadLetter

func (s *OutboxStore) MoveToDeadLetter(ctx context.Context, maxAttempts int) (int64, error)

MoveToDeadLetter transitions messages that exceeded per-message max_attempts or global maxAttempts to dead letter.

func (*OutboxStore) ReclaimStale added in v1.0.25

func (s *OutboxStore) ReclaimStale(ctx context.Context, olderThan time.Duration) (int64, error)

ReclaimStale resets messages stuck in OutboxProcessing whose last attempt is older than olderThan back to pending, recovering messages orphaned when a processor crashes between claiming and marking a message.

func (*OutboxStore) RetryFailed

func (s *OutboxStore) RetryFailed(ctx context.Context, maxAttempts int) (int64, error)

RetryFailed resets eligible failed messages (below per-message max_attempts or global maxAttempts) to pending.

func (*OutboxStore) Schedule

func (s *OutboxStore) Schedule(ctx context.Context, messages []*adapters.OutboxMessage) error

Schedule stores outbox messages for later processing.

func (*OutboxStore) ScheduleInTx

func (s *OutboxStore) ScheduleInTx(ctx context.Context, _ interface{}, messages []*adapters.OutboxMessage) error

ScheduleInTx stores outbox messages (tx parameter is ignored for in-memory store).

type SagaStore

type SagaStore struct {
	// contains filtered or unexported fields
}

SagaStore provides an in-memory implementation of adapters.SagaStore. This is primarily intended for testing and development purposes.

func NewSagaStore

func NewSagaStore() *SagaStore

NewSagaStore creates a new in-memory SagaStore.

func (*SagaStore) All

func (s *SagaStore) All() []*adapters.SagaState

All returns all sagas (useful for testing).

func (*SagaStore) Clear

func (s *SagaStore) Clear()

Clear removes all sagas (useful for testing).

func (*SagaStore) Close

func (s *SagaStore) Close() error

Close releases any resources (no-op for in-memory implementation).

func (*SagaStore) Count

func (s *SagaStore) Count() int

Count returns the total number of sagas stored.

func (*SagaStore) CountByStatus

func (s *SagaStore) CountByStatus(ctx context.Context) (map[adapters.SagaStatus]int64, error)

CountByStatus returns the count of sagas by status.

func (*SagaStore) Delete

func (s *SagaStore) Delete(ctx context.Context, sagaID string) error

Delete removes a saga state.

func (*SagaStore) FindByCorrelationID

func (s *SagaStore) FindByCorrelationID(ctx context.Context, correlationID string) (*adapters.SagaState, error)

FindByCorrelationID finds a saga by its correlation ID.

func (*SagaStore) FindByType

func (s *SagaStore) FindByType(ctx context.Context, sagaType string, statuses ...adapters.SagaStatus) ([]*adapters.SagaState, error)

FindByType finds all sagas of a given type with the specified statuses.

func (*SagaStore) Load

func (s *SagaStore) Load(ctx context.Context, sagaID string) (*adapters.SagaState, error)

Load retrieves a saga state by ID.

func (*SagaStore) Save

func (s *SagaStore) Save(ctx context.Context, state *adapters.SagaState) error

Save persists a saga state with optimistic concurrency control.

Version semantics:

  • Version 0: Creates a new saga. If a saga with this ID already exists, it returns a ConcurrencyError.
  • Version > 0: Updates an existing saga. If no saga exists with this ID, it returns SagaNotFoundError. If the version doesn't match, it returns ConcurrencyError.

After a successful save, state.Version is incremented to reflect the new version.

Reject-on-missing contract: a Version > 0 save against a missing saga is rejected with SagaNotFoundError rather than silently creating the saga. This is intentional and is the stricter of the two possible behaviors: a non-zero version is a claim that a prior version already exists, so creating one here would mask a lost-write or an out-of-order Save. Callers that mean to create a saga must start from Version 0. Note this is intentionally stricter than the PostgreSQL adapter, whose upsert-style Save may create the row in this case; code that must behave identically across adapters should always create with Version 0 first.

type StreamNotFoundError

type StreamNotFoundError = adapters.StreamNotFoundError

StreamNotFoundError is an alias for adapters.StreamNotFoundError for backward compatibility.

type SubscriptionDropError added in v1.0.25

type SubscriptionDropError struct {
	// StreamID is the stream the dropped event belonged to.
	StreamID string

	// GlobalPosition is the global position of the dropped event.
	GlobalPosition uint64
}

SubscriptionDropError reports that a live event could not be delivered to a subscriber because its channel buffer was full and the event was dropped. It is passed to SubscriptionOptions.OnError (when provided) so callers can detect lossy live delivery from the memory adapter. The identifying fields pinpoint the dropped event so the caller may, for example, trigger a checkpoint-based catch-up from GlobalPosition.

func (*SubscriptionDropError) Error added in v1.0.25

func (e *SubscriptionDropError) Error() string

Error returns the error message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL