replay

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2026 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package replay is a typed event-sourced replay framework: append every state-changing event to a log; reconstruct state at any point by replaying from an optional checkpoint.

Use cases:

  • Portfolio reconstruction and forensic analysis (trading).
  • Match replay, encounter reconstruction, anti-cheat (games).
  • Provable audit trails (compliance).
  • Reproducing a production bug from real events (debugging).

The package ships an in-memory Log and Snapshotter for tests; PG / S3 backed implementations are pluggable behind the Log and Snapshotter interfaces.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrSeqMismatch = errors.New("replay: seq does not match next expected")
	ErrEmptyStream = errors.New("replay: stream has no events")
)

Errors surfaced by Log implementations.

Functions

func Replay

func Replay[S, E any](
	ctx context.Context,
	log Log[E],
	snap Snapshotter[S],
	stream StreamKey,
	targetSeq Seq,
	apply Apply[S, E],
	initial S,
	opts Options,
) (S, error)

Replay reconstructs state at targetSeq by loading the latest checkpoint at or before targetSeq (if any), then folding the events between checkpoint.Seq and targetSeq through apply.

If snap is nil or no checkpoint exists, replay starts from initial. If targetSeq == 0 the entire stream is replayed.

Types

type Apply

type Apply[S, E any] func(state S, ev Event[E]) S

Apply folds a single Event into a state to produce the next state. Implementations must be pure (no I/O, deterministic) — replay correctness depends on it.

type Checkpoint

type Checkpoint[S any] struct {
	Stream StreamKey
	Seq    Seq
	State  S
}

Checkpoint captures a derived state at a specific seq for a stream.

type Event

type Event[E any] struct {
	Stream  StreamKey
	Seq     Seq
	Time    time.Time
	Payload E
}

Event is one entry in the log.

type Log

type Log[E any] interface {
	// Append writes an event. The implementation assigns the Seq if
	// the caller passes 0; otherwise the implementation must verify
	// the Seq is the next expected value for the stream.
	Append(ctx context.Context, ev Event[E]) (Seq, error)

	// Read returns up to `limit` events for `stream` starting at Seq
	// >= `from`. Returns an empty slice when there are no more events.
	Read(ctx context.Context, stream StreamKey, from Seq, limit int) ([]Event[E], error)
}

Log is an append-only, per-stream event log.

type MemoryLog

type MemoryLog[E any] struct {
	// contains filtered or unexported fields
}

MemoryLog is an in-process Log[E]. Goroutine-safe.

func NewMemoryLog

func NewMemoryLog[E any]() *MemoryLog[E]

NewMemoryLog constructs an empty MemoryLog.

func (*MemoryLog[E]) Append

func (l *MemoryLog[E]) Append(_ context.Context, ev Event[E]) (Seq, error)

Append implements Log.

func (*MemoryLog[E]) Latest

func (l *MemoryLog[E]) Latest(stream StreamKey) Seq

Latest returns the highest Seq currently appended to stream, or 0 if empty.

func (*MemoryLog[E]) Read

func (l *MemoryLog[E]) Read(_ context.Context, stream StreamKey, from Seq, limit int) ([]Event[E], error)

Read implements Log.

type MemorySnapshotter

type MemorySnapshotter[S any] struct {
	// contains filtered or unexported fields
}

MemorySnapshotter is an in-process Snapshotter[S]. Goroutine-safe.

func NewMemorySnapshotter

func NewMemorySnapshotter[S any]() *MemorySnapshotter[S]

NewMemorySnapshotter constructs an empty MemorySnapshotter.

func (*MemorySnapshotter[S]) Load

func (s *MemorySnapshotter[S]) Load(_ context.Context, stream StreamKey, atOrBefore Seq) (*Checkpoint[S], error)

Load implements Snapshotter.

func (*MemorySnapshotter[S]) Save

func (s *MemorySnapshotter[S]) Save(_ context.Context, c Checkpoint[S]) error

Save implements Snapshotter.

type Options

type Options struct {
	// BatchSize is the chunk size used when reading from the log.
	// Defaults to 1024.
	BatchSize int
	// Snapshot, when non-nil, will be called after applying every N
	// events with the current state so the caller can persist a
	// checkpoint mid-replay. N == 0 disables.
	SnapshotEvery int
}

Options tunes a Replay invocation.

type Seq

type Seq uint64

Seq is the monotonically-increasing sequence number within a stream. Starts at 1; Seq == 0 means "before the stream began".

type Snapshotter

type Snapshotter[S any] interface {
	Save(ctx context.Context, c Checkpoint[S]) error
	// Load returns the most recent checkpoint with Seq <= atOrBefore.
	// Returns (nil, nil) if no such checkpoint exists.
	Load(ctx context.Context, stream StreamKey, atOrBefore Seq) (*Checkpoint[S], error)
}

Snapshotter persists derived-state checkpoints so replays don't have to start from genesis every time.

type StreamKey

type StreamKey string

StreamKey identifies an event stream (account, match id, tenant, etc.).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL