Documentation
¶
Overview ¶
Package memory provides an in-memory implementation of the event store adapter. This adapter is primarily intended for testing and development purposes.
Index ¶
- Constants
- Variables
- type AuditStore
- func (s *AuditStore) Append(ctx context.Context, entry *adapters.AuditEntry) error
- func (s *AuditStore) Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)
- func (s *AuditStore) Clear()
- func (s *AuditStore) Close() error
- func (s *AuditStore) Count(ctx context.Context, q adapters.AuditQuery) (int64, error)
- func (s *AuditStore) Find(ctx context.Context, q adapters.AuditQuery) ([]*adapters.AuditEntry, error)
- func (s *AuditStore) Initialize(ctx context.Context) error
- func (s *AuditStore) Len() int
- type Checkpoint
- type CheckpointStore
- func (s *CheckpointStore) Clear()
- func (s *CheckpointStore) DeleteCheckpoint(_ context.Context, projectionName string) error
- func (s *CheckpointStore) GetAllCheckpoints(ctx context.Context) (map[string]uint64, error)
- func (s *CheckpointStore) GetCheckpoint(_ context.Context, projectionName string) (uint64, error)
- func (s *CheckpointStore) GetCheckpointWithTimestamp(ctx context.Context, projectionName string) (*Checkpoint, error)
- func (s *CheckpointStore) Len() int
- func (s *CheckpointStore) SetCheckpoint(_ context.Context, projectionName string, position uint64) error
- type ConcurrencyError
- type IdempotencyStore
- func (s *IdempotencyStore) Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)
- func (s *IdempotencyStore) Clear()
- func (s *IdempotencyStore) Close() error
- func (s *IdempotencyStore) Delete(ctx context.Context, key string) error
- func (s *IdempotencyStore) Exists(ctx context.Context, key string) (bool, error)
- func (s *IdempotencyStore) Get(ctx context.Context, key string) (*adapters.IdempotencyRecord, error)
- func (s *IdempotencyStore) Len() int
- func (s *IdempotencyStore) Store(ctx context.Context, record *adapters.IdempotencyRecord) error
- func (s *IdempotencyStore) StoreIfAbsent(ctx context.Context, record *adapters.IdempotencyRecord) (bool, error)
- type IdempotencyStoreOption
- type MemoryAdapter
- func (a *MemoryAdapter) Append(ctx context.Context, streamID string, events []adapters.EventRecord, ...) ([]adapters.StoredEvent, error)
- func (a *MemoryAdapter) AppendWithOutbox(ctx context.Context, streamID string, events []adapters.EventRecord, ...) ([]adapters.StoredEvent, error)
- func (a *MemoryAdapter) CheckSchema(ctx context.Context, tableName string) (*adapters.SchemaCheckResult, error)
- func (a *MemoryAdapter) Close() error
- func (a *MemoryAdapter) DeleteSnapshot(ctx context.Context, streamID string) error
- func (a *MemoryAdapter) EventCount() int
- func (a *MemoryAdapter) ExecuteSQL(ctx context.Context, sql string) error
- func (a *MemoryAdapter) GenerateSchema(projectName, tableName, snapshotTableName, outboxTableName string) string
- func (a *MemoryAdapter) GetAppliedMigrations(ctx context.Context) ([]string, error)
- func (a *MemoryAdapter) GetCheckpoint(ctx context.Context, projectionName string) (uint64, error)
- func (a *MemoryAdapter) GetDiagnosticInfo(ctx context.Context) (*adapters.DiagnosticInfo, error)
- func (a *MemoryAdapter) GetEventStoreStats(ctx context.Context) (*adapters.EventStoreStats, error)
- func (a *MemoryAdapter) GetLastPosition(ctx context.Context) (uint64, error)
- func (a *MemoryAdapter) GetProjection(ctx context.Context, name string) (*adapters.ProjectionInfo, error)
- func (a *MemoryAdapter) GetProjectionHealth(ctx context.Context) (*adapters.ProjectionHealthResult, error)
- func (a *MemoryAdapter) GetStreamEvents(ctx context.Context, streamID string, fromVersion int64, limit int) ([]adapters.StoredEvent, error)
- func (a *MemoryAdapter) GetStreamInfo(ctx context.Context, streamID string) (*adapters.StreamInfo, error)
- func (a *MemoryAdapter) GetTotalEventCount(ctx context.Context) (int64, error)
- func (a *MemoryAdapter) Initialize(ctx context.Context) error
- func (a *MemoryAdapter) ListProjections(ctx context.Context) ([]adapters.ProjectionInfo, error)
- func (a *MemoryAdapter) ListStreams(ctx context.Context, prefix string, limit int) ([]adapters.StreamSummary, error)
- func (a *MemoryAdapter) Load(ctx context.Context, streamID string, fromVersion int64) ([]adapters.StoredEvent, error)
- func (a *MemoryAdapter) LoadFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]adapters.StoredEvent, error)
- func (a *MemoryAdapter) LoadSnapshot(ctx context.Context, streamID string) (*adapters.SnapshotRecord, error)
- func (a *MemoryAdapter) Migrate(ctx context.Context) error
- func (a *MemoryAdapter) MigrationVersion(ctx context.Context) (int, error)
- func (a *MemoryAdapter) Ping(ctx context.Context) error
- func (a *MemoryAdapter) RecordMigration(ctx context.Context, name string) error
- func (a *MemoryAdapter) RemoveMigrationRecord(ctx context.Context, name string) error
- func (a *MemoryAdapter) Reset()
- func (a *MemoryAdapter) ResetProjectionCheckpoint(ctx context.Context, name string) error
- func (a *MemoryAdapter) SaveSnapshot(ctx context.Context, streamID string, version int64, data []byte) error
- func (a *MemoryAdapter) SetCheckpoint(ctx context.Context, projectionName string, position uint64) error
- func (a *MemoryAdapter) SetProjectionStatus(ctx context.Context, name string, status string) error
- func (a *MemoryAdapter) StreamCount() int
- func (a *MemoryAdapter) SubscribeAll(ctx context.Context, fromPosition uint64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)
- func (a *MemoryAdapter) SubscribeCategory(ctx context.Context, category string, fromPosition uint64, ...) (<-chan adapters.StoredEvent, error)
- func (a *MemoryAdapter) SubscribeStream(ctx context.Context, streamID string, fromVersion int64, ...) (<-chan adapters.StoredEvent, error)
- type Option
- type OutboxStore
- func (s *OutboxStore) Cleanup(ctx context.Context, olderThan time.Duration) (int64, error)
- func (s *OutboxStore) Clear()
- func (s *OutboxStore) Close() error
- func (s *OutboxStore) Count() int
- func (s *OutboxStore) CountByStatus() map[adapters.OutboxStatus]int
- func (s *OutboxStore) FetchPending(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)
- func (s *OutboxStore) GetDeadLetterMessages(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)
- func (s *OutboxStore) Initialize(ctx context.Context) error
- func (s *OutboxStore) MarkCompleted(ctx context.Context, ids []string) error
- func (s *OutboxStore) MarkFailed(ctx context.Context, id string, lastErr error) error
- func (s *OutboxStore) MoveToDeadLetter(ctx context.Context, maxAttempts int) (int64, error)
- func (s *OutboxStore) ReclaimStale(ctx context.Context, olderThan time.Duration) (int64, error)
- func (s *OutboxStore) RetryFailed(ctx context.Context, maxAttempts int) (int64, error)
- func (s *OutboxStore) Schedule(ctx context.Context, messages []*adapters.OutboxMessage) error
- func (s *OutboxStore) ScheduleInTx(ctx context.Context, _ interface{}, messages []*adapters.OutboxMessage) error
- type SagaStore
- func (s *SagaStore) All() []*adapters.SagaState
- func (s *SagaStore) Clear()
- func (s *SagaStore) Close() error
- func (s *SagaStore) Count() int
- func (s *SagaStore) CountByStatus(ctx context.Context) (map[adapters.SagaStatus]int64, error)
- func (s *SagaStore) Delete(ctx context.Context, sagaID string) error
- func (s *SagaStore) FindByCorrelationID(ctx context.Context, correlationID string) (*adapters.SagaState, error)
- func (s *SagaStore) FindByType(ctx context.Context, sagaType string, statuses ...adapters.SagaStatus) ([]*adapters.SagaState, error)
- func (s *SagaStore) Load(ctx context.Context, sagaID string) (*adapters.SagaState, error)
- func (s *SagaStore) Save(ctx context.Context, state *adapters.SagaState) error
- type StreamNotFoundError
- type SubscriptionDropError
Constants ¶
const ( AnyVersion = adapters.AnyVersion NoStream = adapters.NoStream StreamExists = adapters.StreamExists )
Version constants for optimistic concurrency control. These are re-exported from the adapters package for convenience.
Variables ¶
var ( // ErrAdapterClosed is returned when an operation is attempted on a closed adapter. ErrAdapterClosed = adapters.ErrAdapterClosed // ErrEmptyStreamID is returned when an empty stream ID is provided. ErrEmptyStreamID = adapters.ErrEmptyStreamID // ErrNoEvents is returned when attempting to append zero events. ErrNoEvents = adapters.ErrNoEvents // ErrConcurrencyConflict is returned when optimistic concurrency check fails. ErrConcurrencyConflict = adapters.ErrConcurrencyConflict // ErrStreamNotFound is returned when a stream does not exist. ErrStreamNotFound = adapters.ErrStreamNotFound // ErrInvalidVersion is returned when an invalid version is specified. ErrInvalidVersion = adapters.ErrInvalidVersion )
Sentinel errors for the memory adapter. These are aliases to the adapters package errors for compatibility with errors.Is().
var NewConcurrencyError = adapters.NewConcurrencyError
NewConcurrencyError is an alias for adapters.NewConcurrencyError for backward compatibility.
var NewStreamNotFoundError = adapters.NewStreamNotFoundError
NewStreamNotFoundError is an alias for adapters.NewStreamNotFoundError for backward compatibility.
Functions ¶
This section is empty.
Types ¶
type AuditStore ¶ added in v1.1.0
type AuditStore struct {
// contains filtered or unexported fields
}
AuditStore provides an in-memory implementation of adapters.AuditStore. It is useful for testing and development but should not be used in production as it does not persist data across restarts.
func NewAuditStore ¶ added in v1.1.0
func NewAuditStore() *AuditStore
NewAuditStore creates a new in-memory AuditStore.
func (*AuditStore) Append ¶ added in v1.1.0
func (s *AuditStore) Append(ctx context.Context, entry *adapters.AuditEntry) error
Append stores a copy of the audit entry.
func (*AuditStore) Cleanup ¶ added in v1.1.0
Cleanup removes entries with a Timestamp older than olderThan ago. Returns the number of entries removed.
func (*AuditStore) Clear ¶ added in v1.1.0
func (s *AuditStore) Clear()
Clear removes all entries from the store. Useful for testing.
func (*AuditStore) Close ¶ added in v1.1.0
func (s *AuditStore) Close() error
Close is a no-op for the in-memory store.
func (*AuditStore) Count ¶ added in v1.1.0
func (s *AuditStore) Count(ctx context.Context, q adapters.AuditQuery) (int64, error)
Count returns the number of entries matching the query. It ignores Limit and Offset.
func (*AuditStore) Find ¶ added in v1.1.0
func (s *AuditStore) Find(ctx context.Context, q adapters.AuditQuery) ([]*adapters.AuditEntry, error)
Find returns audit entries matching the query, honoring Order/Limit/Offset.
func (*AuditStore) Initialize ¶ added in v1.1.0
func (s *AuditStore) Initialize(ctx context.Context) error
Initialize is a no-op for the in-memory store.
func (*AuditStore) Len ¶ added in v1.1.0
func (s *AuditStore) Len() int
Len returns the number of entries in the store. Useful for testing.
type Checkpoint ¶
Checkpoint represents a stored checkpoint.
type CheckpointStore ¶
type CheckpointStore struct {
// contains filtered or unexported fields
}
CheckpointStore provides an in-memory implementation of CheckpointAdapter.
func NewCheckpointStore ¶
func NewCheckpointStore() *CheckpointStore
NewCheckpointStore creates a new in-memory checkpoint store.
func (*CheckpointStore) DeleteCheckpoint ¶
func (s *CheckpointStore) DeleteCheckpoint(_ context.Context, projectionName string) error
DeleteCheckpoint removes a checkpoint for a projection.
func (*CheckpointStore) GetAllCheckpoints ¶
GetAllCheckpoints returns all stored checkpoints.
func (*CheckpointStore) GetCheckpoint ¶
GetCheckpoint retrieves the current position for a projection. Returns 0 if no checkpoint exists.
func (*CheckpointStore) GetCheckpointWithTimestamp ¶
func (s *CheckpointStore) GetCheckpointWithTimestamp(ctx context.Context, projectionName string) (*Checkpoint, error)
GetCheckpointWithTimestamp retrieves the checkpoint with its last update time.
func (*CheckpointStore) Len ¶
func (s *CheckpointStore) Len() int
Len returns the number of checkpoints.
func (*CheckpointStore) SetCheckpoint ¶
func (s *CheckpointStore) SetCheckpoint(_ context.Context, projectionName string, position uint64) error
SetCheckpoint stores the position for a projection.
type ConcurrencyError ¶
type ConcurrencyError = adapters.ConcurrencyError
ConcurrencyError is an alias for adapters.ConcurrencyError for backward compatibility.
type IdempotencyStore ¶
type IdempotencyStore struct {
// contains filtered or unexported fields
}
IdempotencyStore provides an in-memory implementation of adapters.IdempotencyStore. It is useful for testing and development but should not be used in production as it does not persist data across restarts.
func NewIdempotencyStore ¶
func NewIdempotencyStore(opts ...IdempotencyStoreOption) *IdempotencyStore
NewIdempotencyStore creates a new in-memory IdempotencyStore.
func (*IdempotencyStore) Cleanup ¶
Cleanup removes records older than the specified duration. Returns the number of records deleted.
func (*IdempotencyStore) Clear ¶
func (s *IdempotencyStore) Clear()
Clear removes all records from the store. Useful for testing.
func (*IdempotencyStore) Close ¶
func (s *IdempotencyStore) Close() error
Close stops the cleanup goroutine and releases resources. It is safe to call Close() multiple times.
func (*IdempotencyStore) Delete ¶
func (s *IdempotencyStore) Delete(ctx context.Context, key string) error
Delete removes an idempotency record by key.
func (*IdempotencyStore) Exists ¶
Exists checks if a record with the given key exists and is not expired.
func (*IdempotencyStore) Get ¶
func (s *IdempotencyStore) Get(ctx context.Context, key string) (*adapters.IdempotencyRecord, error)
Get retrieves an idempotency record by key. Returns nil if the record doesn't exist or is expired.
func (*IdempotencyStore) Len ¶
func (s *IdempotencyStore) Len() int
Len returns the number of records in the store. Useful for testing.
func (*IdempotencyStore) Store ¶
func (s *IdempotencyStore) Store(ctx context.Context, record *adapters.IdempotencyRecord) error
Store saves a new idempotency record.
func (*IdempotencyStore) StoreIfAbsent ¶ added in v1.0.25
func (s *IdempotencyStore) StoreIfAbsent(ctx context.Context, record *adapters.IdempotencyRecord) (bool, error)
StoreIfAbsent atomically stores the record only if no live (non-expired) record exists for its key. An expired record is treated as absent and overwritten. Returns true if the record was stored.
type IdempotencyStoreOption ¶
type IdempotencyStoreOption func(*IdempotencyStore)
IdempotencyStoreOption configures an IdempotencyStore
func WithCleanupInterval ¶
func WithCleanupInterval(interval time.Duration) IdempotencyStoreOption
WithCleanupInterval sets the interval for automatic cleanup. Set to 0 to disable automatic cleanup.
func WithMaxAge ¶
func WithMaxAge(maxAge time.Duration) IdempotencyStoreOption
WithMaxAge sets the maximum age for records. Records older than this will be cleaned up.
type MemoryAdapter ¶
type MemoryAdapter struct {
// contains filtered or unexported fields
}
MemoryAdapter is an in-memory implementation of EventStoreAdapter. It is thread-safe and suitable for unit testing.
func NewAdapter ¶
func NewAdapter(opts ...Option) *MemoryAdapter
NewAdapter creates a new in-memory event store adapter.
func (*MemoryAdapter) Append ¶
func (a *MemoryAdapter) Append(ctx context.Context, streamID string, events []adapters.EventRecord, expectedVersion int64) ([]adapters.StoredEvent, error)
Append stores events to the specified stream with optimistic concurrency control.
func (*MemoryAdapter) AppendWithOutbox ¶ added in v1.0.25
func (a *MemoryAdapter) AppendWithOutbox(ctx context.Context, streamID string, events []adapters.EventRecord, expectedVersion int64, outbox adapters.OutboxStore, outboxMessages []*adapters.OutboxMessage) ([]adapters.StoredEvent, error)
AppendWithOutbox atomically appends events and schedules outbox messages into the provided store, all under the adapter's write lock.
The append is validated and the messages scheduled BEFORE any events are written, so a version conflict or a scheduling failure leaves neither the events nor the messages — mirroring the single-transaction guarantee of the PostgreSQL adapter. Messages are scheduled into the caller-provided store (the one configured on EventStoreWithOutbox), keeping the atomic path consistent with the store the caller reads from.
Scheduling runs while a.mu is held, so the provided store must not call back into this adapter; the in-process memory OutboxStore does not.
func (*MemoryAdapter) CheckSchema ¶
func (a *MemoryAdapter) CheckSchema(ctx context.Context, tableName string) (*adapters.SchemaCheckResult, error)
CheckSchema verifies the event store "schema" (always exists for memory).
func (*MemoryAdapter) Close ¶
func (a *MemoryAdapter) Close() error
Close releases any resources held by the adapter.
func (*MemoryAdapter) DeleteSnapshot ¶
func (a *MemoryAdapter) DeleteSnapshot(ctx context.Context, streamID string) error
DeleteSnapshot removes the snapshot for the given stream.
func (*MemoryAdapter) EventCount ¶
func (a *MemoryAdapter) EventCount() int
EventCount returns the total number of events stored.
func (*MemoryAdapter) ExecuteSQL ¶
func (a *MemoryAdapter) ExecuteSQL(ctx context.Context, sql string) error
ExecuteSQL is a no-op for the memory adapter (no SQL to execute).
func (*MemoryAdapter) GenerateSchema ¶
func (a *MemoryAdapter) GenerateSchema(projectName, tableName, snapshotTableName, outboxTableName string) string
GenerateSchema returns an informational message for the memory adapter.
func (*MemoryAdapter) GetAppliedMigrations ¶
func (a *MemoryAdapter) GetAppliedMigrations(ctx context.Context) ([]string, error)
GetAppliedMigrations returns the list of applied migration names.
func (*MemoryAdapter) GetCheckpoint ¶
GetCheckpoint returns the last processed position for a projection.
func (*MemoryAdapter) GetDiagnosticInfo ¶
func (a *MemoryAdapter) GetDiagnosticInfo(ctx context.Context) (*adapters.DiagnosticInfo, error)
GetDiagnosticInfo returns diagnostic information for the memory adapter.
func (*MemoryAdapter) GetEventStoreStats ¶
func (a *MemoryAdapter) GetEventStoreStats(ctx context.Context) (*adapters.EventStoreStats, error)
GetEventStoreStats returns aggregate statistics about the event store.
func (*MemoryAdapter) GetLastPosition ¶
func (a *MemoryAdapter) GetLastPosition(ctx context.Context) (uint64, error)
GetLastPosition returns the global position of the last stored event.
func (*MemoryAdapter) GetProjection ¶
func (a *MemoryAdapter) GetProjection(ctx context.Context, name string) (*adapters.ProjectionInfo, error)
GetProjection returns information about a specific projection.
func (*MemoryAdapter) GetProjectionHealth ¶
func (a *MemoryAdapter) GetProjectionHealth(ctx context.Context) (*adapters.ProjectionHealthResult, error)
GetProjectionHealth returns projection health status for memory adapter.
func (*MemoryAdapter) GetStreamEvents ¶
func (a *MemoryAdapter) GetStreamEvents(ctx context.Context, streamID string, fromVersion int64, limit int) ([]adapters.StoredEvent, error)
GetStreamEvents returns events from a stream with pagination for CLI display.
func (*MemoryAdapter) GetStreamInfo ¶
func (a *MemoryAdapter) GetStreamInfo(ctx context.Context, streamID string) (*adapters.StreamInfo, error)
GetStreamInfo returns metadata about a stream.
func (*MemoryAdapter) GetTotalEventCount ¶
func (a *MemoryAdapter) GetTotalEventCount(ctx context.Context) (int64, error)
GetTotalEventCount returns the highest global position.
func (*MemoryAdapter) Initialize ¶
func (a *MemoryAdapter) Initialize(ctx context.Context) error
Initialize is a no-op for the memory adapter.
func (*MemoryAdapter) ListProjections ¶
func (a *MemoryAdapter) ListProjections(ctx context.Context) ([]adapters.ProjectionInfo, error)
ListProjections returns all registered projections.
func (*MemoryAdapter) ListStreams ¶
func (a *MemoryAdapter) ListStreams(ctx context.Context, prefix string, limit int) ([]adapters.StreamSummary, error)
ListStreams returns a list of stream summaries for CLI display.
func (*MemoryAdapter) Load ¶
func (a *MemoryAdapter) Load(ctx context.Context, streamID string, fromVersion int64) ([]adapters.StoredEvent, error)
Load retrieves all events from a stream starting from the specified version.
func (*MemoryAdapter) LoadFromPosition ¶
func (a *MemoryAdapter) LoadFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]adapters.StoredEvent, error)
LoadFromPosition loads events starting from a global position. This is used by projection engines to catch up on historical events.
func (*MemoryAdapter) LoadSnapshot ¶
func (a *MemoryAdapter) LoadSnapshot(ctx context.Context, streamID string) (*adapters.SnapshotRecord, error)
LoadSnapshot retrieves the latest snapshot for the given stream.
func (*MemoryAdapter) Migrate ¶ added in v1.0.25
func (a *MemoryAdapter) Migrate(ctx context.Context) error
Migrate runs pending schema migrations. The memory adapter requires no schema, so this is a no-op (it only honors context cancellation and closed state) and exists to satisfy the adapters.Migrator interface for parity with PostgreSQL.
func (*MemoryAdapter) MigrationVersion ¶ added in v1.0.25
func (a *MemoryAdapter) MigrationVersion(ctx context.Context) (int, error)
MigrationVersion returns the current migration version. The memory adapter has no evolving schema, so it reports a constant version of 1 whenever the adapter is usable. This satisfies the adapters.Migrator interface.
func (*MemoryAdapter) Ping ¶
func (a *MemoryAdapter) Ping(ctx context.Context) error
Ping checks if the adapter is healthy.
func (*MemoryAdapter) RecordMigration ¶
func (a *MemoryAdapter) RecordMigration(ctx context.Context, name string) error
RecordMigration marks a migration as applied.
func (*MemoryAdapter) RemoveMigrationRecord ¶
func (a *MemoryAdapter) RemoveMigrationRecord(ctx context.Context, name string) error
RemoveMigrationRecord removes a migration record (for rollback).
func (*MemoryAdapter) Reset ¶
func (a *MemoryAdapter) Reset()
Reset clears all data. Useful for testing.
func (*MemoryAdapter) ResetProjectionCheckpoint ¶
func (a *MemoryAdapter) ResetProjectionCheckpoint(ctx context.Context, name string) error
ResetProjectionCheckpoint resets a projection's position to 0 for rebuild.
func (*MemoryAdapter) SaveSnapshot ¶
func (a *MemoryAdapter) SaveSnapshot(ctx context.Context, streamID string, version int64, data []byte) error
SaveSnapshot stores a snapshot for the given stream.
func (*MemoryAdapter) SetCheckpoint ¶
func (a *MemoryAdapter) SetCheckpoint(ctx context.Context, projectionName string, position uint64) error
SetCheckpoint stores the last processed position for a projection.
func (*MemoryAdapter) SetProjectionStatus ¶
SetProjectionStatus updates a projection's status.
func (*MemoryAdapter) StreamCount ¶
func (a *MemoryAdapter) StreamCount() int
StreamCount returns the number of streams.
func (*MemoryAdapter) SubscribeAll ¶
func (a *MemoryAdapter) SubscribeAll(ctx context.Context, fromPosition uint64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)
SubscribeAll subscribes to all events across all streams.
Live delivery is best-effort and lossy: each subscriber is served by a buffered channel (sized by SubscriptionOptions.BufferSize). Historical events are delivered synchronously while the subscription is being set up, but once the subscription is live, an event is dropped for any subscriber whose buffer is full at the moment of publication (the memory adapter never blocks the writer). When an event is dropped, the subscription's OnError callback (if provided) is invoked so the caller can detect the loss; consumers that must not miss events should drain the channel promptly, use a sufficiently large buffer, or rely on checkpoint-based catch-up via LoadFromPosition rather than purely on live delivery.
func (*MemoryAdapter) SubscribeCategory ¶
func (a *MemoryAdapter) SubscribeCategory(ctx context.Context, category string, fromPosition uint64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)
SubscribeCategory subscribes to all events from streams in a category.
func (*MemoryAdapter) SubscribeStream ¶
func (a *MemoryAdapter) SubscribeStream(ctx context.Context, streamID string, fromVersion int64, opts ...adapters.SubscriptionOptions) (<-chan adapters.StoredEvent, error)
SubscribeStream subscribes to events from a specific stream.
type OutboxStore ¶
type OutboxStore struct {
// contains filtered or unexported fields
}
OutboxStore provides an in-memory implementation of adapters.OutboxStore. This is primarily intended for testing and development purposes. Note: ScheduleInTx ignores the tx parameter since in-memory storage has no real transactions.
func NewOutboxStore ¶
func NewOutboxStore() *OutboxStore
NewOutboxStore creates a new in-memory OutboxStore.
func (*OutboxStore) Clear ¶
func (s *OutboxStore) Clear()
Clear removes all messages (useful for testing).
func (*OutboxStore) Close ¶
func (s *OutboxStore) Close() error
Close is a no-op for the in-memory store.
func (*OutboxStore) Count ¶
func (s *OutboxStore) Count() int
Count returns the total number of messages stored.
func (*OutboxStore) CountByStatus ¶
func (s *OutboxStore) CountByStatus() map[adapters.OutboxStatus]int
CountByStatus returns the count of messages by status.
func (*OutboxStore) FetchPending ¶
func (s *OutboxStore) FetchPending(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)
FetchPending atomically claims up to limit pending messages for processing.
func (*OutboxStore) GetDeadLetterMessages ¶
func (s *OutboxStore) GetDeadLetterMessages(ctx context.Context, limit int) ([]*adapters.OutboxMessage, error)
GetDeadLetterMessages retrieves dead-lettered messages.
func (*OutboxStore) Initialize ¶
func (s *OutboxStore) Initialize(ctx context.Context) error
Initialize is a no-op for the in-memory store.
func (*OutboxStore) MarkCompleted ¶
func (s *OutboxStore) MarkCompleted(ctx context.Context, ids []string) error
MarkCompleted marks messages as successfully delivered.
func (*OutboxStore) MarkFailed ¶
MarkFailed marks a message as failed with an error description.
func (*OutboxStore) MoveToDeadLetter ¶
MoveToDeadLetter transitions messages that exceeded per-message max_attempts or global maxAttempts to dead letter.
func (*OutboxStore) ReclaimStale ¶ added in v1.0.25
ReclaimStale resets messages stuck in OutboxProcessing whose last attempt is older than olderThan back to pending, recovering messages orphaned when a processor crashes between claiming and marking a message.
func (*OutboxStore) RetryFailed ¶
RetryFailed resets eligible failed messages (below per-message max_attempts or global maxAttempts) to pending.
func (*OutboxStore) Schedule ¶
func (s *OutboxStore) Schedule(ctx context.Context, messages []*adapters.OutboxMessage) error
Schedule stores outbox messages for later processing.
func (*OutboxStore) ScheduleInTx ¶
func (s *OutboxStore) ScheduleInTx(ctx context.Context, _ interface{}, messages []*adapters.OutboxMessage) error
ScheduleInTx stores outbox messages (tx parameter is ignored for in-memory store).
type SagaStore ¶
type SagaStore struct {
// contains filtered or unexported fields
}
SagaStore provides an in-memory implementation of adapters.SagaStore. This is primarily intended for testing and development purposes.
func (*SagaStore) CountByStatus ¶
CountByStatus returns the count of sagas by status.
func (*SagaStore) FindByCorrelationID ¶
func (s *SagaStore) FindByCorrelationID(ctx context.Context, correlationID string) (*adapters.SagaState, error)
FindByCorrelationID finds a saga by its correlation ID.
func (*SagaStore) FindByType ¶
func (s *SagaStore) FindByType(ctx context.Context, sagaType string, statuses ...adapters.SagaStatus) ([]*adapters.SagaState, error)
FindByType finds all sagas of a given type with the specified statuses.
func (*SagaStore) Save ¶
Save persists a saga state with optimistic concurrency control.
Version semantics:
- Version 0: Creates a new saga. If a saga with this ID already exists, it returns a ConcurrencyError.
- Version > 0: Updates an existing saga. If no saga exists with this ID, it returns SagaNotFoundError. If the version doesn't match, it returns ConcurrencyError.
After a successful save, state.Version is incremented to reflect the new version.
Reject-on-missing contract: a Version > 0 save against a missing saga is rejected with SagaNotFoundError rather than silently creating the saga. This is intentional and is the stricter of the two possible behaviors: a non-zero version is a claim that a prior version already exists, so creating one here would mask a lost-write or an out-of-order Save. Callers that mean to create a saga must start from Version 0. Note this is intentionally stricter than the PostgreSQL adapter, whose upsert-style Save may create the row in this case; code that must behave identically across adapters should always create with Version 0 first.
type StreamNotFoundError ¶
type StreamNotFoundError = adapters.StreamNotFoundError
StreamNotFoundError is an alias for adapters.StreamNotFoundError for backward compatibility.
type SubscriptionDropError ¶ added in v1.0.25
type SubscriptionDropError struct {
// StreamID is the stream the dropped event belonged to.
StreamID string
// GlobalPosition is the global position of the dropped event.
GlobalPosition uint64
}
SubscriptionDropError reports that a live event could not be delivered to a subscriber because its channel buffer was full and the event was dropped. It is passed to SubscriptionOptions.OnError (when provided) so callers can detect lossy live delivery from the memory adapter. The identifying fields pinpoint the dropped event so the caller may, for example, trigger a checkpoint-based catch-up from GlobalPosition.
func (*SubscriptionDropError) Error ¶ added in v1.0.25
func (e *SubscriptionDropError) Error() string
Error returns the error message.