Documentation
¶
Overview ¶
Package replay is a typed event-sourced replay framework: append every state-changing event to a log; reconstruct state at any point by replaying from an optional checkpoint.
Use cases:
- Portfolio reconstruction and forensic analysis (trading).
- Match replay, encounter reconstruction, anti-cheat (games).
- Provable audit trails (compliance).
- Reproducing a production bug from real events (debugging).
The package ships an in-memory Log and Snapshotter for tests; PG / S3 backed implementations are pluggable behind the Log and Snapshotter interfaces.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrSeqMismatch = errors.New("replay: seq does not match next expected") ErrEmptyStream = errors.New("replay: stream has no events") )
Errors surfaced by Log implementations.
Functions ¶
func Replay ¶
func Replay[S, E any]( ctx context.Context, log Log[E], snap Snapshotter[S], stream StreamKey, targetSeq Seq, apply Apply[S, E], initial S, opts Options, ) (S, error)
Replay reconstructs state at targetSeq by loading the latest checkpoint at or before targetSeq (if any), then folding the events between checkpoint.Seq and targetSeq through apply.
If snap is nil or no checkpoint exists, replay starts from initial. If targetSeq == 0 the entire stream is replayed.
Types ¶
type Apply ¶
Apply folds a single Event into a state to produce the next state. Implementations must be pure (no I/O, deterministic) — replay correctness depends on it.
type Checkpoint ¶
Checkpoint captures a derived state at a specific seq for a stream.
type Log ¶
type Log[E any] interface { // Append writes an event. The implementation assigns the Seq if // the caller passes 0; otherwise the implementation must verify // the Seq is the next expected value for the stream. Append(ctx context.Context, ev Event[E]) (Seq, error) // Read returns up to `limit` events for `stream` starting at Seq // >= `from`. Returns an empty slice when there are no more events. Read(ctx context.Context, stream StreamKey, from Seq, limit int) ([]Event[E], error) }
Log is an append-only, per-stream event log.
type MemoryLog ¶
type MemoryLog[E any] struct { // contains filtered or unexported fields }
MemoryLog is an in-process Log[E]. Goroutine-safe.
func NewMemoryLog ¶
NewMemoryLog constructs an empty MemoryLog.
type MemorySnapshotter ¶
type MemorySnapshotter[S any] struct { // contains filtered or unexported fields }
MemorySnapshotter is an in-process Snapshotter[S]. Goroutine-safe.
func NewMemorySnapshotter ¶
func NewMemorySnapshotter[S any]() *MemorySnapshotter[S]
NewMemorySnapshotter constructs an empty MemorySnapshotter.
func (*MemorySnapshotter[S]) Load ¶
func (s *MemorySnapshotter[S]) Load(_ context.Context, stream StreamKey, atOrBefore Seq) (*Checkpoint[S], error)
Load implements Snapshotter.
func (*MemorySnapshotter[S]) Save ¶
func (s *MemorySnapshotter[S]) Save(_ context.Context, c Checkpoint[S]) error
Save implements Snapshotter.
type Options ¶
type Options struct {
// BatchSize is the chunk size used when reading from the log.
// Defaults to 1024.
BatchSize int
// Snapshot, when non-nil, will be called after applying every N
// events with the current state so the caller can persist a
// checkpoint mid-replay. N == 0 disables.
SnapshotEvery int
}
Options tunes a Replay invocation.
type Seq ¶
type Seq uint64
Seq is the monotonically-increasing sequence number within a stream. Starts at 1; Seq == 0 means "before the stream began".
type Snapshotter ¶
type Snapshotter[S any] interface { Save(ctx context.Context, c Checkpoint[S]) error // Load returns the most recent checkpoint with Seq <= atOrBefore. // Returns (nil, nil) if no such checkpoint exists. Load(ctx context.Context, stream StreamKey, atOrBefore Seq) (*Checkpoint[S], error) }
Snapshotter persists derived-state checkpoints so replays don't have to start from genesis every time.