memory

package
v1.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Overview

Package memory provides an in-memory implementation of the event store adapter. This adapter is primarily intended for testing and development purposes.

Index

Constants

View Source
const (
	AnyVersion   = adapters.AnyVersion
	NoStream     = adapters.NoStream
	StreamExists = adapters.StreamExists
)

Version constants for optimistic concurrency control. These are re-exported from the adapters package for convenience.

Variables

View Source
var (
	// ErrAdapterClosed is returned when an operation is attempted on a closed adapter.
	ErrAdapterClosed = adapters.ErrAdapterClosed

	// ErrEmptyStreamID is returned when an empty stream ID is provided.
	ErrEmptyStreamID = adapters.ErrEmptyStreamID

	// ErrNoEvents is returned when attempting to append zero events.
	ErrNoEvents = adapters.ErrNoEvents

	// ErrConcurrencyConflict is returned when optimistic concurrency check fails.
	ErrConcurrencyConflict = adapters.ErrConcurrencyConflict

	// ErrStreamNotFound is returned when a stream does not exist.
	ErrStreamNotFound = adapters.ErrStreamNotFound

	// ErrInvalidVersion is returned when an invalid version is specified.
	ErrInvalidVersion = adapters.ErrInvalidVersion
)

Sentinel errors for the memory adapter. These are aliases to the adapters package errors for compatibility with errors.Is().

View Source
var NewConcurrencyError = adapters.NewConcurrencyError

NewConcurrencyError is an alias for adapters.NewConcurrencyError for backward compatibility.

View Source
var NewStreamNotFoundError = adapters.NewStreamNotFoundError

NewStreamNotFoundError is an alias for adapters.NewStreamNotFoundError for backward compatibility.

Functions

This section is empty.

Types

type Checkpoint

type Checkpoint struct {
	ProjectionName string
	Position       uint64
	UpdatedAt      time.Time
}

Checkpoint represents a stored checkpoint.

type CheckpointStore

type CheckpointStore struct {
	// contains filtered or unexported fields
}

CheckpointStore provides an in-memory implementation of CheckpointAdapter.

func NewCheckpointStore

func NewCheckpointStore() *CheckpointStore

NewCheckpointStore creates a new in-memory checkpoint store.

func (*CheckpointStore) Clear

func (s *CheckpointStore) Clear()

Clear removes all checkpoints.

func (*CheckpointStore) DeleteCheckpoint

func (s *CheckpointStore) DeleteCheckpoint(_ context.Context, projectionName string) error

DeleteCheckpoint removes a checkpoint for a projection.

func (*CheckpointStore) GetAllCheckpoints

func (s *CheckpointStore) GetAllCheckpoints(ctx context.Context) (map[string]uint64, error)

GetAllCheckpoints returns all stored checkpoints.

func (*CheckpointStore) GetCheckpoint

func (s *CheckpointStore) GetCheckpoint(_ context.Context, projectionName string) (uint64, error)

GetCheckpoint retrieves the current position for a projection. Returns 0 if no checkpoint exists.

func (*CheckpointStore) GetCheckpointWithTimestamp

func (s *CheckpointStore) GetCheckpointWithTimestamp(ctx context.Context, projectionName string) (*Checkpoint, error)

GetCheckpointWithTimestamp retrieves the checkpoint with its last update time.

func (*CheckpointStore) Len

func (s *CheckpointStore) Len() int

Len returns the number of checkpoints.

func (*CheckpointStore) SetCheckpoint

func (s *CheckpointStore) SetCheckpoint(_ context.Context, projectionName string, position uint64) error

SetCheckpoint stores the position for a projection.

type ConcurrencyError

type ConcurrencyError = adapters.ConcurrencyError

ConcurrencyError is an alias for adapters.ConcurrencyError for backward compatibility.

type IdempotencyStore

type IdempotencyStore struct {
	// contains filtered or unexported fields
}

IdempotencyStore provides an in-memory implementation of adapters.IdempotencyStore. It is useful for testing and development but should not be used in production as it does not persist data across restarts.

func NewIdempotencyStore

func NewIdempotencyStore(opts ...IdempotencyStoreOption) *IdempotencyStore

NewIdempotencyStore creates a new in-memory IdempotencyStore.

func (*IdempotencyStore) Cleanup

func (s *IdempotencyStore) Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)

Cleanup removes records older than the specified duration. Returns the number of records deleted.

func (*IdempotencyStore) Clear

func (s *IdempotencyStore) Clear()

Clear removes all records from the store. Useful for testing.

func (*IdempotencyStore) Close

func (s *IdempotencyStore) Close() error

Close stops the cleanup goroutine and releases resources. It is safe to call Close() multiple times.

func (*IdempotencyStore) Delete

func (s *IdempotencyStore) Delete(ctx context.Context, key string) error

Delete removes an idempotency record by key.

func (*IdempotencyStore) Exists

func (s *IdempotencyStore) Exists(ctx context.Context, key string) (bool, error)

Exists checks if a record with the given key exists and is not expired.

func (*IdempotencyStore) Get

Get retrieves an idempotency record by key. Returns nil if the record doesn't exist or is expired.

func (*IdempotencyStore) Len

func (s *IdempotencyStore) Len() int

Len returns the number of records in the store. Useful for testing.

func (*IdempotencyStore) Store

Store saves a new idempotency record.

type IdempotencyStoreOption

type IdempotencyStoreOption func(*IdempotencyStore)

IdempotencyStoreOption configures an IdempotencyStore

func WithCleanupInterval

func WithCleanupInterval(interval time.Duration) IdempotencyStoreOption

WithCleanupInterval sets the interval for automatic cleanup. Set to 0 to disable automatic cleanup.

func WithMaxAge

func WithMaxAge(maxAge time.Duration) IdempotencyStoreOption

WithMaxAge sets the maximum age for records. Records older than this will be cleaned up.

type MemoryAdapter

type MemoryAdapter struct {
	// contains filtered or unexported fields
}

MemoryAdapter is an in-memory implementation of EventStoreAdapter. It is thread-safe and suitable for unit testing.

func NewAdapter

func NewAdapter(opts ...Option) *MemoryAdapter

NewAdapter creates a new in-memory event store adapter.

func (*MemoryAdapter) Append

func (a *MemoryAdapter) Append(ctx context.Context, streamID string, events []adapters.EventRecord, expectedVersion int64) ([]adapters.StoredEvent, error)

Append stores events to the specified stream with optimistic concurrency control.

func (*MemoryAdapter) CheckSchema

func (a *MemoryAdapter) CheckSchema(ctx context.Context, tableName string) (*adapters.SchemaCheckResult, error)

CheckSchema verifies the event store "schema" (always exists for memory).

func (*MemoryAdapter) Close

func (a *MemoryAdapter) Close() error

Close releases any resources held by the adapter.

func (*MemoryAdapter) DeleteSnapshot

func (a *MemoryAdapter) DeleteSnapshot(ctx context.Context, streamID string) error

DeleteSnapshot removes the snapshot for the given stream.

func (*MemoryAdapter) EventCount

func (a *MemoryAdapter) EventCount() int

EventCount returns the total number of events stored.

func (*MemoryAdapter) ExecuteSQL

func (a *MemoryAdapter) ExecuteSQL(ctx context.Context, sql string) error

ExecuteSQL is a no-op for the memory adapter (no SQL to execute).

func (*MemoryAdapter) GenerateSchema

func (a *MemoryAdapter) GenerateSchema(projectName, tableName, snapshotTableName, outboxTableName string) string

GenerateSchema returns an informational message for the memory adapter.

func (*MemoryAdapter) GetAppliedMigrations

func (a *MemoryAdapter) GetAppliedMigrations(ctx context.Context) ([]string, error)

GetAppliedMigrations returns the list of applied migration names.

func (*MemoryAdapter) GetCheckpoint

func (a *MemoryAdapter) GetCheckpoint(ctx context.Context, projectionName string) (uint64, error)

GetCheckpoint returns the last processed position for a projection.

func (*MemoryAdapter) GetDiagnosticInfo

func (a *MemoryAdapter) GetDiagnosticInfo(ctx context.Context) (*adapters.DiagnosticInfo, error)

GetDiagnosticInfo returns diagnostic information for the memory adapter.

func (*MemoryAdapter) GetEventStoreStats

func (a *MemoryAdapter) GetEventStoreStats(ctx context.Context) (*adapters.EventStoreStats, error)

GetEventStoreStats returns aggregate statistics about the event store.

func (*MemoryAdapter) GetLastPosition

func (a *MemoryAdapter) GetLastPosition(ctx context.Context) (uint64, error)

GetLastPosition returns the global position of the last stored event.

func (*MemoryAdapter) GetProjection

func (a *MemoryAdapter) GetProjection(ctx context.Context, name string) (*adapters.ProjectionInfo, error)

GetProjection returns information about a specific projection.

func (*MemoryAdapter) GetProjectionHealth

func (a *MemoryAdapter) GetProjectionHealth(ctx context.Context) (*adapters.ProjectionHealthResult, error)

GetProjectionHealth returns projection health status for memory adapter.

func (*MemoryAdapter) GetStreamEvents

func (a *MemoryAdapter) GetStreamEvents(ctx context.Context, streamID string, fromVersion int64, limit int) ([]adapters.StoredEvent, error)

GetStreamEvents returns events from a stream with pagination for CLI display.

func (*MemoryAdapter) GetStreamInfo

func (a *MemoryAdapter) GetStreamInfo(ctx context.Context, streamID string) (*adapters.StreamInfo, error)

GetStreamInfo returns metadata about a stream.

func (*MemoryAdapter) GetTotalEventCount

func (a *MemoryAdapter) GetTotalEventCount(ctx context.Context) (int64, error)

GetTotalEventCount returns the highest global position.

func (*MemoryAdapter) Initialize

func (a *MemoryAdapter) Initialize(ctx context.Context) error

Initialize is a no-op for the memory adapter.

func (*MemoryAdapter) ListProjections

func (a *MemoryAdapter) ListProjections(ctx context.Context) ([]adapters.ProjectionInfo, error)

ListProjections returns all registered projections.

func (*MemoryAdapter) ListStreams

func (a *MemoryAdapter) ListStreams(ctx context.Context, prefix string, limit int) ([]adapters.StreamSummary, error)

ListStreams returns a list of stream summaries for CLI display.

func (*MemoryAdapter) Load

func (a *MemoryAdapter) Load(ctx context.Context, streamID string, fromVersion int64) ([]adapters.StoredEvent, error)

Load retrieves all events from a stream starting from the specified version.

func (*MemoryAdapter) LoadFromPosition

func (a *MemoryAdapter) LoadFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]adapters.StoredEvent, error)

LoadFromPosition loads events starting from a global position. This is used by projection engines to catch up on historical events.

func (*MemoryAdapter) LoadSnapshot

func (a *MemoryAdapter) LoadSnapshot(ctx context.Context, streamID string) (*adapters.SnapshotRecord, error)

LoadSnapshot retrieves the latest snapshot for the given stream.

func (*MemoryAdapter) Ping

func (a *MemoryAdapter) Ping(ctx context.Context) error

Ping checks if the adapter is healthy.

func (*MemoryAdapter) RecordMigration

func (a *MemoryAdapter) RecordMigration(ctx context.Context, name string) error

RecordMigration marks a migration as applied.

func (*MemoryAdapter) RemoveMigrationRecord

func (a *MemoryAdapter) RemoveMigrationRecord(ctx context.Context, name string) error

RemoveMigrationRecord removes a migration record (for rollback).

func (*MemoryAdapter) Reset

func (a *MemoryAdapter) Reset()

Reset clears all data. Useful for testing.

func (*MemoryAdapter) ResetProjectionCheckpoint

func (a *MemoryAdapter) ResetProjectionCheckpoint(ctx context.Context, name string) error

ResetProjectionCheckpoint resets a projection's position to 0 for rebuild.

func (*MemoryAdapter) SaveSnapshot

func (a *MemoryAdapter) SaveSnapshot(ctx context.Context, streamID string, version int64, data []byte) error

SaveSnapshot stores a snapshot for the given stream.

func (*MemoryAdapter) SetCheckpoint

func (a *MemoryAdapter) SetCheckpoint(ctx context.Context, projectionName string, position uint64) error

SetCheckpoint stores the last processed position for a projection.

func (*MemoryAdapter) SetProjectionStatus

func (a *MemoryAdapter) SetProjectionStatus(ctx context.Context, name string, status string) error

SetProjectionStatus updates a projection's status.

func (*MemoryAdapter) StreamCount

func (a *MemoryAdapter) StreamCount() int

StreamCount returns the number of streams.

func (*MemoryAdapter) SubscribeAll

func (a *MemoryAdapter) SubscribeAll(ctx context.Context, fromPosition uint64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)

SubscribeAll subscribes to all events across all streams.

func (*MemoryAdapter) SubscribeCategory

func (a *MemoryAdapter) SubscribeCategory(ctx context.Context, category string, fromPosition uint64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)

SubscribeCategory subscribes to all events from streams in a category.

func (*MemoryAdapter) SubscribeStream

func (a *MemoryAdapter) SubscribeStream(ctx context.Context, streamID string, fromVersion int64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)

SubscribeStream subscribes to events from a specific stream.

type Option

type Option func(*MemoryAdapter)

Option configures a MemoryAdapter.

type OutboxStore

type OutboxStore struct {
	// contains filtered or unexported fields
}

OutboxStore provides an in-memory implementation of adapters.OutboxStore. This is primarily intended for testing and development purposes. Note: ScheduleInTx ignores the tx parameter since in-memory storage has no real transactions.

func NewOutboxStore

func NewOutboxStore() *OutboxStore

NewOutboxStore creates a new in-memory OutboxStore.

func (*OutboxStore) Cleanup

func (s *OutboxStore) Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)

Cleanup removes old completed messages.

func (*OutboxStore) Clear

func (s *OutboxStore) Clear()

Clear removes all messages (useful for testing).

func (*OutboxStore) Close

func (s *OutboxStore) Close() error

Close is a no-op for the in-memory store.

func (*OutboxStore) Count

func (s *OutboxStore) Count() int

Count returns the total number of messages stored.

func (*OutboxStore) CountByStatus

func (s *OutboxStore) CountByStatus() map[adapters.OutboxStatus]int

CountByStatus returns the count of messages by status.

func (*OutboxStore) FetchPending

func (s *OutboxStore) FetchPending(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)

FetchPending atomically claims up to limit pending messages for processing.

func (*OutboxStore) GetDeadLetterMessages

func (s *OutboxStore) GetDeadLetterMessages(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)

GetDeadLetterMessages retrieves dead-lettered messages.

func (*OutboxStore) Initialize

func (s *OutboxStore) Initialize(ctx context.Context) error

Initialize is a no-op for the in-memory store.

func (*OutboxStore) MarkCompleted

func (s *OutboxStore) MarkCompleted(ctx context.Context, ids []string) error

MarkCompleted marks messages as successfully delivered.

func (*OutboxStore) MarkFailed

func (s *OutboxStore) MarkFailed(ctx context.Context, id string, lastErr error) error

MarkFailed marks a message as failed with an error description.

func (*OutboxStore) MoveToDeadLetter

func (s *OutboxStore) MoveToDeadLetter(ctx context.Context, maxAttempts int) (int64, error)

MoveToDeadLetter transitions messages that exceeded per-message max_attempts or global maxAttempts to dead letter.

func (*OutboxStore) RetryFailed

func (s *OutboxStore) RetryFailed(ctx context.Context, maxAttempts int) (int64, error)

RetryFailed resets eligible failed messages (below per-message max_attempts or global maxAttempts) to pending.

func (*OutboxStore) Schedule

func (s *OutboxStore) Schedule(ctx context.Context, messages []*adapters.OutboxMessage) error

Schedule stores outbox messages for later processing.

func (*OutboxStore) ScheduleInTx

func (s *OutboxStore) ScheduleInTx(ctx context.Context, _ interface{}, messages []*adapters.OutboxMessage) error

ScheduleInTx stores outbox messages (tx parameter is ignored for in-memory store).

type SagaStore

type SagaStore struct {
	// contains filtered or unexported fields
}

SagaStore provides an in-memory implementation of adapters.SagaStore. This is primarily intended for testing and development purposes.

func NewSagaStore

func NewSagaStore() *SagaStore

NewSagaStore creates a new in-memory SagaStore.

func (*SagaStore) All

func (s *SagaStore) All() []*adapters.SagaState

All returns all sagas (useful for testing).

func (*SagaStore) Clear

func (s *SagaStore) Clear()

Clear removes all sagas (useful for testing).

func (*SagaStore) Close

func (s *SagaStore) Close() error

Close releases any resources (no-op for in-memory implementation).

func (*SagaStore) Count

func (s *SagaStore) Count() int

Count returns the total number of sagas stored.

func (*SagaStore) CountByStatus

func (s *SagaStore) CountByStatus(ctx context.Context) (map[adapters.SagaStatus]int64, error)

CountByStatus returns the count of sagas by status.

func (*SagaStore) Delete

func (s *SagaStore) Delete(ctx context.Context, sagaID string) error

Delete removes a saga state.

func (*SagaStore) FindByCorrelationID

func (s *SagaStore) FindByCorrelationID(ctx context.Context, correlationID string) (*adapters.SagaState, error)

FindByCorrelationID finds a saga by its correlation ID.

func (*SagaStore) FindByType

func (s *SagaStore) FindByType(ctx context.Context, sagaType string, statuses ...adapters.SagaStatus) ([]*adapters.SagaState, error)

FindByType finds all sagas of a given type with the specified statuses.

func (*SagaStore) Load

func (s *SagaStore) Load(ctx context.Context, sagaID string) (*adapters.SagaState, error)

Load retrieves a saga state by ID.

func (*SagaStore) Save

func (s *SagaStore) Save(ctx context.Context, state *adapters.SagaState) error

Save persists a saga state with optimistic concurrency control.

Version semantics:

  • Version 0: Creates a new saga. If a saga with this ID already exists, it returns a ConcurrencyError.
  • Version > 0: Updates an existing saga. If no saga exists with this ID, it returns SagaNotFoundError. If the version doesn't match, it returns ConcurrencyError.

After a successful save, state.Version is incremented to reflect the new version.

type StreamNotFoundError

type StreamNotFoundError = adapters.StreamNotFoundError

StreamNotFoundError is an alias for adapters.StreamNotFoundError for backward compatibility.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL