eventlog

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 31, 2025 License: BSD-3-Clause Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrUnsupportedCheck = errors.New("unsupported version check type")
	ErrNoEvents         = errors.New("empty events")
)

Functions

This section is empty.

Types

type MemTx

type MemTx struct{}

MemTx is a dummy transaction handle for the in-memory store. Its presence in a function signature indicates that the function must be called within the critical section managed by WithinTx.

type Memory

type Memory struct {
	// contains filtered or unexported fields
}

func NewMemory

func NewMemory(opts ...MemoryOption) *Memory

NewMemory creates a new in-memory event store.

func (*Memory) AppendEvents

func (mem *Memory) AppendEvents(
	ctx context.Context,
	id event.LogID,
	expected version.Check,
	events event.RawEvents,
) (version.Version, error)

func (*Memory) AppendInTx

func (mem *Memory) AppendInTx(
	ctx context.Context,
	_ MemTx,
	id event.LogID,
	expected version.Check,
	events event.RawEvents,
) (version.Version, []*event.Record, error)

func (*Memory) DangerouslyDeleteEventsUpTo added in v0.5.0

func (mem *Memory) DangerouslyDeleteEventsUpTo(
	ctx context.Context,
	id event.LogID,
	version version.Version,
) error

func (*Memory) ReadAllEvents

func (mem *Memory) ReadAllEvents(
	ctx context.Context,
	globalSelector version.Selector,
) event.GlobalRecords

ReadAllEvents returns an iterator over all events in the store. If the store was created with WithGlobalTailing(), the iterator will first yield all existing events and then block indefinitely, yielding new events as they are appended.

func (*Memory) ReadEvents

func (store *Memory) ReadEvents(
	ctx context.Context,
	id event.LogID,
	selector version.Selector,
) event.Records

func (*Memory) WithinTx

func (mem *Memory) WithinTx(
	ctx context.Context,
	fn func(ctx context.Context, tx MemTx) error,
) error

WithinTx executes the given function within a mutex-protected critical section, simulating a transaction. Note: This simple implementation does not support rollback on error; changes made to the store before an error occurs within the function will persist.

type MemoryOption added in v0.5.0

type MemoryOption func(*Memory)

func WithMemoryGlobalTailing added in v0.5.0

func WithMemoryGlobalTailing() MemoryOption

WithMemoryGlobalTailing enables the "tailing" or "subscription" mode for ReadAllEvents. When enabled, the iterator will block and wait for new events after reading all historical ones, behaving like a channel.

type Postgres

type Postgres struct {
	// contains filtered or unexported fields
}

Postgres is an implementation of event.Log for a PostgreSQL database. It uses a dedicated table for events and a PL/pgSQL function with a trigger to enforce optimistic concurrency control at the database level. This approach is highly reliable as it prevents race conditions during writes.

See `NewPostgres` for initialization.

func NewPostgres

func NewPostgres(db *sql.DB, opts ...PostgresOption) (*Postgres, error)

NewPostgres creates a new Postgres event log. Upon initialization, it ensures that the necessary database schema (table, function, and trigger) is created. This setup is performed within a transaction, making it safe to call on application startup.

IMPORTANT: By default, this log uses a JSONB column and expects a JSON-based encoder (e.g., codec.NewJSONB()) to be configured in the repository. Modify the migrations or create your own implementation of a store if you want a different format.

Usage:

db, err := sql.Open("postgres", "user=... password=... dbname=... sslmode=disable")
if err != nil {
    log.Fatal(err)
}
pgLog, err := eventlog.NewPostgres(db)
if err != nil {
    log.Fatal(err)
}

Returns a configured `*Postgres` instance or an error if the setup fails.

func (*Postgres) AppendEvents

func (p *Postgres) AppendEvents(
	ctx context.Context,
	id event.LogID,
	expected version.Check,
	events event.RawEvents,
) (version.Version, error)

AppendEvents writes a batch of raw events for a given aggregate ID to the log. It wraps the entire operation in a new database transaction to ensure atomicity.

Usage:

newVersion, err := pgLog.AppendEvents(ctx, logID, expectedVersion, rawEvents)
if err != nil {
    var conflictErr *version.ConflictError
    if errors.As(err, &conflictErr) {
        // handle optimistic concurrency failure
    }
}

Returns the new version of the aggregate after the append, or an error. A `version.ConflictError` is returned if the expected version does not match.

func (*Postgres) AppendInTx

func (p *Postgres) AppendInTx(
	ctx context.Context,
	tx *sql.Tx,
	id event.LogID,
	expected version.Check,
	events event.RawEvents,
) (version.Version, []*event.Record, error)

AppendInTx writes events within an existing database transaction. It relies on the `trg_chronicle_check_event_version` trigger in the database to perform the optimistic concurrency check. If the check fails, the trigger raises an exception which is parsed into a `version.ConflictError`.

This method is primarily for internal use by `TransactionalRepository` or advanced scenarios.

Returns the new aggregate version, the records that were created, and an error if the operation fails.

func (*Postgres) DangerouslyDeleteEventsUpTo added in v0.5.0

func (p *Postgres) DangerouslyDeleteEventsUpTo(
	ctx context.Context,
	id event.LogID,
	version version.Version,
) error

⚠️⚠️⚠️ WARNING: Read carefully

DangerouslyDeleteEventsUpTo permanently deletes all events for a specific log ID up to and INCLUDING the specified version.

This operation is irreversible and breaks the immutability of the event log.

It is intended for use cases manually pruning event streams, and should be used with extreme caution.

Rebuilding aggregates or projections after this operation may lead to an inconsistent state.

It is recommended to only use this after generating a snapshot event of your aggregate state before running this. Remember to also invalidate projections that depend on deleted events and any snapshots older than the version you're calling this function with.

func (*Postgres) ReadAllEvents

func (p *Postgres) ReadAllEvents(
	ctx context.Context,
	globalSelector version.Selector,
) event.GlobalRecords

ReadAllEvents retrieves the global stream of all events across all aggregates, ordered chronologically by their global sequence number. This is useful for building projections or other system-wide consumers.

Usage:

globalRecords := pgLog.ReadAllEvents(ctx, version.Selector{From: 1})
for gRecord, err := range globalRecords {
    // process global record for a projection
}

Returns an `event.GlobalRecords` iterator.

func (*Postgres) ReadEvents

func (p *Postgres) ReadEvents(
	ctx context.Context,
	id event.LogID,
	selector version.Selector,
) event.Records

ReadEvents retrieves the event history for a single aggregate, starting from a specified version. It returns an iterator for efficiently processing the stream.

Usage:

records := pgLog.ReadEvents(ctx, logID, version.SelectFromBeginning)
for record, err := range records {
    // process record
}

Returns an `event.Records` iterator.

func (*Postgres) WithinTx

func (p *Postgres) WithinTx(
	ctx context.Context,
	fn func(ctx context.Context, tx *sql.Tx) error,
) error

WithinTx executes a function within a database transaction. It begins a new transaction, executes the provided function, and then commits it. If the function returns an error or a panic occurs, the transaction is rolled back.

Usage:

err := pgLog.WithinTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
    // Perform database operations with tx
    return nil
})

Returns an error if the transaction fails to begin, commit, or if the provided function returns an error.

type PostgresOption

type PostgresOption func(*Postgres)

func WithPGMigrations added in v0.5.0

func WithPGMigrations(options migrations.Options) PostgresOption

WithPGMigrations configures migration behavior for the Postgres event log. Use this to skip automatic migrations or provide a custom logger.

type Sqlite

type Sqlite struct {
	// contains filtered or unexported fields
}

Sqlite is an implementation of event.Log for a SQLite database. It uses a dedicated table for events and a trigger to enforce optimistic concurrency control at the database level. This approach is highly reliable as it prevents race conditions during writes.

See `NewSqlite` for initialization.

func NewSqlite

func NewSqlite(db *sql.DB, opts ...SqliteOption) (*Sqlite, error)

NewSqlite creates a new Sqlite event log. Upon initialization, it ensures that the necessary database schema (table and trigger) is created. This setup is performed within a transaction, making it safe to call on application startup.

IMPORTANT: By default, this log uses a BLOB column and expects a binary-based encoder (e.g., codec.NewGOB() or codec.NewJSONB()) to be configured in the repository. Modify the migrations or create your own implementation of a store if you want a different format.

Usage:

db, err := sql.Open("sqlite3", "file:chronicle.db?cache=shared&mode=rwc")
if err != nil {
    log.Fatal(err)
}
sqliteLog, err := eventlog.NewSqlite(db)
if err != nil {
    log.Fatal(err)
}

Returns a configured `*Sqlite` instance or an error if the setup fails.

func (*Sqlite) AppendEvents

func (s *Sqlite) AppendEvents(
	ctx context.Context,
	id event.LogID,
	expected version.Check,
	events event.RawEvents,
) (version.Version, error)

AppendEvents writes a batch of raw events for a given aggregate ID to the log. It wraps the entire operation in a new database transaction to ensure atomicity.

Usage:

newVersion, err := sqliteLog.AppendEvents(ctx, logID, expectedVersion, rawEvents)
if err != nil {
    var conflictErr *version.ConflictError
    if errors.As(err, &conflictErr) {
        // handle optimistic concurrency failure
    }
}

Returns the new version of the aggregate after the append, or an error. A `version.ConflictError` is returned if the expected version does not match.

func (*Sqlite) AppendInTx

func (s *Sqlite) AppendInTx(
	ctx context.Context,
	tx *sql.Tx,
	id event.LogID,
	expected version.Check,
	events event.RawEvents,
) (version.Version, []*event.Record, error)

AppendInTx writes events within an existing database transaction. It relies on the database trigger to perform the optimistic concurrency check. If the check fails, the trigger raises an exception which is parsed into a `version.ConflictError`.

This method is primarily for internal use by `TransactionalRepository` or advanced scenarios.

Returns the new aggregate version, the records that were created, and an error if the operation fails.

func (*Sqlite) DangerouslyDeleteEventsUpTo added in v0.5.0

func (s *Sqlite) DangerouslyDeleteEventsUpTo(
	ctx context.Context,
	id event.LogID,
	version version.Version,
) error

⚠️⚠️⚠️ WARNING: Read carefully

DangerouslyDeleteEventsUpTo permanently deletes all events for a specific log ID up to and INCLUDING the specified version.

This operation is irreversible and breaks the immutability of the event log.

It is intended for use cases manually pruning event streams, and should be used with extreme caution.

Rebuilding aggregates or projections after this operation may lead to an inconsistent state.

It is recommended to only use this after generating a snapshot event of your aggregate state before running this. Remember to also invalidate projections that depend on deleted events and any snapshots older than the version you're calling this function with.

func (*Sqlite) ReadAllEvents

func (s *Sqlite) ReadAllEvents(
	ctx context.Context,
	globalSelector version.Selector,
) event.GlobalRecords

ReadAllEvents retrieves the global stream of all events across all aggregates, ordered chronologically by their global sequence number. This is useful for building projections or other system-wide consumers.

Usage:

globalRecords := sqliteLog.ReadAllEvents(ctx, version.Selector{From: 1})
for gRecord, err := range globalRecords {
    // process global record for a projection
}

Returns an `event.GlobalRecords` iterator.

func (*Sqlite) ReadEvents

func (s *Sqlite) ReadEvents(
	ctx context.Context,
	id event.LogID,
	selector version.Selector,
) event.Records

ReadEvents retrieves the event history for a single aggregate, starting from a specified version. It returns an iterator for efficiently processing the stream.

Usage:

records := sqliteLog.ReadEvents(ctx, logID, version.SelectFromBeginning)
for record, err := range records {
    // process record
}

Returns an `event.Records` iterator.

func (*Sqlite) WithinTx

func (s *Sqlite) WithinTx(
	ctx context.Context,
	fn func(ctx context.Context, tx *sql.Tx) error,
) error

WithinTx executes a function within a database transaction. It begins a new transaction, executes the provided function, and then commits it. If the function returns an error or a panic occurs, the transaction is rolled back.

Usage:

err := sqliteLog.WithinTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
    // Perform database operations with tx
    return nil
})

Returns an error if the transaction fails to begin, commit, or if the provided function returns an error.

type SqliteOption

type SqliteOption func(*Sqlite)

func WithSqliteMigrations added in v0.5.0

func WithSqliteMigrations(options migrations.Options) SqliteOption

WithSqliteMigrations configures migration behavior for the SQLite event log. Use this to skip automatic migrations or provide a custom logger.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL