Documentation
¶
Index ¶
- Variables
- type MemTx
- type Memory
- func (mem *Memory) AppendEvents(ctx context.Context, id event.LogID, expected version.Check, ...) (version.Version, error)
- func (mem *Memory) AppendInTx(ctx context.Context, _ MemTx, id event.LogID, expected version.Check, ...) (version.Version, []*event.Record, error)
- func (mem *Memory) DangerouslyDeleteEventsUpTo(ctx context.Context, id event.LogID, version version.Version) error
- func (mem *Memory) ReadAllEvents(ctx context.Context, globalSelector version.Selector) event.GlobalRecords
- func (store *Memory) ReadEvents(ctx context.Context, id event.LogID, selector version.Selector) event.Records
- func (mem *Memory) WithinTx(ctx context.Context, fn func(ctx context.Context, tx MemTx) error) error
- type MemoryOption
- type Postgres
- func (p *Postgres) AppendEvents(ctx context.Context, id event.LogID, expected version.Check, ...) (version.Version, error)
- func (p *Postgres) AppendInTx(ctx context.Context, tx *sql.Tx, id event.LogID, expected version.Check, ...) (version.Version, []*event.Record, error)
- func (p *Postgres) DangerouslyDeleteEventsUpTo(ctx context.Context, id event.LogID, version version.Version) error
- func (p *Postgres) ReadAllEvents(ctx context.Context, globalSelector version.Selector) event.GlobalRecords
- func (p *Postgres) ReadEvents(ctx context.Context, id event.LogID, selector version.Selector) event.Records
- func (p *Postgres) WithinTx(ctx context.Context, fn func(ctx context.Context, tx *sql.Tx) error) error
- type PostgresOption
- type Sqlite
- func (s *Sqlite) AppendEvents(ctx context.Context, id event.LogID, expected version.Check, ...) (version.Version, error)
- func (s *Sqlite) AppendInTx(ctx context.Context, tx *sql.Tx, id event.LogID, expected version.Check, ...) (version.Version, []*event.Record, error)
- func (s *Sqlite) DangerouslyDeleteEventsUpTo(ctx context.Context, id event.LogID, version version.Version) error
- func (s *Sqlite) ReadAllEvents(ctx context.Context, globalSelector version.Selector) event.GlobalRecords
- func (s *Sqlite) ReadEvents(ctx context.Context, id event.LogID, selector version.Selector) event.Records
- func (s *Sqlite) WithinTx(ctx context.Context, fn func(ctx context.Context, tx *sql.Tx) error) error
- type SqliteOption
Constants ¶
This section is empty.
Variables ¶
var ( ErrUnsupportedCheck = errors.New("unsupported version check type") ErrNoEvents = errors.New("empty events") )
Functions ¶
This section is empty.
Types ¶
type MemTx ¶
type MemTx struct{}
MemTx is a dummy transaction handle for the in-memory store. Its presence in a function signature indicates that the function must be called within the critical section managed by WithinTx.
type Memory ¶
type Memory struct {
// contains filtered or unexported fields
}
func NewMemory ¶
func NewMemory(opts ...MemoryOption) *Memory
NewMemory creates a new in-memory event store.
func (*Memory) AppendEvents ¶
func (*Memory) AppendInTx ¶
func (*Memory) DangerouslyDeleteEventsUpTo ¶ added in v0.5.0
func (*Memory) ReadAllEvents ¶
func (mem *Memory) ReadAllEvents( ctx context.Context, globalSelector version.Selector, ) event.GlobalRecords
ReadAllEvents returns an iterator over all events in the store. If the store was created with WithGlobalTailing(), the iterator will first yield all existing events and then block indefinitely, yielding new events as they are appended.
func (*Memory) ReadEvents ¶
func (*Memory) WithinTx ¶
func (mem *Memory) WithinTx( ctx context.Context, fn func(ctx context.Context, tx MemTx) error, ) error
WithinTx executes the given function within a mutex-protected critical section, simulating a transaction. Note: This simple implementation does not support rollback on error; changes made to the store before an error occurs within the function will persist.
type MemoryOption ¶ added in v0.5.0
type MemoryOption func(*Memory)
func WithMemoryGlobalTailing ¶ added in v0.5.0
func WithMemoryGlobalTailing() MemoryOption
WithMemoryGlobalTailing enables the "tailing" or "subscription" mode for ReadAllEvents. When enabled, the iterator will block and wait for new events after reading all historical ones, behaving like a channel.
type Postgres ¶
type Postgres struct {
// contains filtered or unexported fields
}
Postgres is an implementation of event.Log for a PostgreSQL database. It uses a dedicated table for events and a PL/pgSQL function with a trigger to enforce optimistic concurrency control at the database level. This approach is highly reliable as it prevents race conditions during writes.
See `NewPostgres` for initialization.
func NewPostgres ¶
func NewPostgres(db *sql.DB, opts ...PostgresOption) (*Postgres, error)
NewPostgres creates a new Postgres event log. Upon initialization, it ensures that the necessary database schema (table, function, and trigger) is created. This setup is performed within a transaction, making it safe to call on application startup.
IMPORTANT: By default, this log uses a JSONB column and expects a JSON-based encoder (e.g., codec.NewJSONB()) to be configured in the repository. Modify the migrations or create your own implementation of a store if you want a different format.
Usage:
db, err := sql.Open("postgres", "user=... password=... dbname=... sslmode=disable")
if err != nil {
log.Fatal(err)
}
pgLog, err := eventlog.NewPostgres(db)
if err != nil {
log.Fatal(err)
}
Returns a configured `*Postgres` instance or an error if the setup fails.
func (*Postgres) AppendEvents ¶
func (p *Postgres) AppendEvents( ctx context.Context, id event.LogID, expected version.Check, events event.RawEvents, ) (version.Version, error)
AppendEvents writes a batch of raw events for a given aggregate ID to the log. It wraps the entire operation in a new database transaction to ensure atomicity.
Usage:
newVersion, err := pgLog.AppendEvents(ctx, logID, expectedVersion, rawEvents)
if err != nil {
var conflictErr *version.ConflictError
if errors.As(err, &conflictErr) {
// handle optimistic concurrency failure
}
}
Returns the new version of the aggregate after the append, or an error. A `version.ConflictError` is returned if the expected version does not match.
func (*Postgres) AppendInTx ¶
func (p *Postgres) AppendInTx( ctx context.Context, tx *sql.Tx, id event.LogID, expected version.Check, events event.RawEvents, ) (version.Version, []*event.Record, error)
AppendInTx writes events within an existing database transaction. It relies on the `trg_chronicle_check_event_version` trigger in the database to perform the optimistic concurrency check. If the check fails, the trigger raises an exception which is parsed into a `version.ConflictError`.
This method is primarily for internal use by `TransactionalRepository` or advanced scenarios.
Returns the new aggregate version, the records that were created, and an error if the operation fails.
func (*Postgres) DangerouslyDeleteEventsUpTo ¶ added in v0.5.0
func (p *Postgres) DangerouslyDeleteEventsUpTo( ctx context.Context, id event.LogID, version version.Version, ) error
⚠️⚠️⚠️ WARNING: Read carefully
DangerouslyDeleteEventsUpTo permanently deletes all events for a specific log ID up to and INCLUDING the specified version.
This operation is irreversible and breaks the immutability of the event log.
It is intended for use cases manually pruning event streams, and should be used with extreme caution.
Rebuilding aggregates or projections after this operation may lead to an inconsistent state.
It is recommended to only use this after generating a snapshot event of your aggregate state before running this. Remember to also invalidate projections that depend on deleted events and any snapshots older than the version you're calling this function with.
func (*Postgres) ReadAllEvents ¶
func (p *Postgres) ReadAllEvents( ctx context.Context, globalSelector version.Selector, ) event.GlobalRecords
ReadAllEvents retrieves the global stream of all events across all aggregates, ordered chronologically by their global sequence number. This is useful for building projections or other system-wide consumers.
Usage:
globalRecords := pgLog.ReadAllEvents(ctx, version.Selector{From: 1})
for gRecord, err := range globalRecords {
// process global record for a projection
}
Returns an `event.GlobalRecords` iterator.
func (*Postgres) ReadEvents ¶
func (p *Postgres) ReadEvents( ctx context.Context, id event.LogID, selector version.Selector, ) event.Records
ReadEvents retrieves the event history for a single aggregate, starting from a specified version. It returns an iterator for efficiently processing the stream.
Usage:
records := pgLog.ReadEvents(ctx, logID, version.SelectFromBeginning)
for record, err := range records {
// process record
}
Returns an `event.Records` iterator.
func (*Postgres) WithinTx ¶
func (p *Postgres) WithinTx( ctx context.Context, fn func(ctx context.Context, tx *sql.Tx) error, ) error
WithinTx executes a function within a database transaction. It begins a new transaction, executes the provided function, and then commits it. If the function returns an error or a panic occurs, the transaction is rolled back.
Usage:
err := pgLog.WithinTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
// Perform database operations with tx
return nil
})
Returns an error if the transaction fails to begin, commit, or if the provided function returns an error.
type PostgresOption ¶
type PostgresOption func(*Postgres)
func WithPGMigrations ¶ added in v0.5.0
func WithPGMigrations(options migrations.Options) PostgresOption
WithPGMigrations configures migration behavior for the Postgres event log. Use this to skip automatic migrations or provide a custom logger.
type Sqlite ¶
type Sqlite struct {
// contains filtered or unexported fields
}
Sqlite is an implementation of event.Log for a SQLite database. It uses a dedicated table for events and a trigger to enforce optimistic concurrency control at the database level. This approach is highly reliable as it prevents race conditions during writes.
See `NewSqlite` for initialization.
func NewSqlite ¶
func NewSqlite(db *sql.DB, opts ...SqliteOption) (*Sqlite, error)
NewSqlite creates a new Sqlite event log. Upon initialization, it ensures that the necessary database schema (table and trigger) is created. This setup is performed within a transaction, making it safe to call on application startup.
IMPORTANT: By default, this log uses a BLOB column and expects a binary-based encoder (e.g., codec.NewGOB() or codec.NewJSONB()) to be configured in the repository. Modify the migrations or create your own implementation of a store if you want a different format.
Usage:
db, err := sql.Open("sqlite3", "file:chronicle.db?cache=shared&mode=rwc")
if err != nil {
log.Fatal(err)
}
sqliteLog, err := eventlog.NewSqlite(db)
if err != nil {
log.Fatal(err)
}
Returns a configured `*Sqlite` instance or an error if the setup fails.
func (*Sqlite) AppendEvents ¶
func (s *Sqlite) AppendEvents( ctx context.Context, id event.LogID, expected version.Check, events event.RawEvents, ) (version.Version, error)
AppendEvents writes a batch of raw events for a given aggregate ID to the log. It wraps the entire operation in a new database transaction to ensure atomicity.
Usage:
newVersion, err := sqliteLog.AppendEvents(ctx, logID, expectedVersion, rawEvents)
if err != nil {
var conflictErr *version.ConflictError
if errors.As(err, &conflictErr) {
// handle optimistic concurrency failure
}
}
Returns the new version of the aggregate after the append, or an error. A `version.ConflictError` is returned if the expected version does not match.
func (*Sqlite) AppendInTx ¶
func (s *Sqlite) AppendInTx( ctx context.Context, tx *sql.Tx, id event.LogID, expected version.Check, events event.RawEvents, ) (version.Version, []*event.Record, error)
AppendInTx writes events within an existing database transaction. It relies on the database trigger to perform the optimistic concurrency check. If the check fails, the trigger raises an exception which is parsed into a `version.ConflictError`.
This method is primarily for internal use by `TransactionalRepository` or advanced scenarios.
Returns the new aggregate version, the records that were created, and an error if the operation fails.
func (*Sqlite) DangerouslyDeleteEventsUpTo ¶ added in v0.5.0
func (s *Sqlite) DangerouslyDeleteEventsUpTo( ctx context.Context, id event.LogID, version version.Version, ) error
⚠️⚠️⚠️ WARNING: Read carefully
DangerouslyDeleteEventsUpTo permanently deletes all events for a specific log ID up to and INCLUDING the specified version.
This operation is irreversible and breaks the immutability of the event log.
It is intended for use cases manually pruning event streams, and should be used with extreme caution.
Rebuilding aggregates or projections after this operation may lead to an inconsistent state.
It is recommended to only use this after generating a snapshot event of your aggregate state before running this. Remember to also invalidate projections that depend on deleted events and any snapshots older than the version you're calling this function with.
func (*Sqlite) ReadAllEvents ¶
func (s *Sqlite) ReadAllEvents( ctx context.Context, globalSelector version.Selector, ) event.GlobalRecords
ReadAllEvents retrieves the global stream of all events across all aggregates, ordered chronologically by their global sequence number. This is useful for building projections or other system-wide consumers.
Usage:
globalRecords := sqliteLog.ReadAllEvents(ctx, version.Selector{From: 1})
for gRecord, err := range globalRecords {
// process global record for a projection
}
Returns an `event.GlobalRecords` iterator.
func (*Sqlite) ReadEvents ¶
func (s *Sqlite) ReadEvents( ctx context.Context, id event.LogID, selector version.Selector, ) event.Records
ReadEvents retrieves the event history for a single aggregate, starting from a specified version. It returns an iterator for efficiently processing the stream.
Usage:
records := sqliteLog.ReadEvents(ctx, logID, version.SelectFromBeginning)
for record, err := range records {
// process record
}
Returns an `event.Records` iterator.
func (*Sqlite) WithinTx ¶
func (s *Sqlite) WithinTx( ctx context.Context, fn func(ctx context.Context, tx *sql.Tx) error, ) error
WithinTx executes a function within a database transaction. It begins a new transaction, executes the provided function, and then commits it. If the function returns an error or a panic occurs, the transaction is rolled back.
Usage:
err := sqliteLog.WithinTx(ctx, func(ctx context.Context, tx *sql.Tx) error {
// Perform database operations with tx
return nil
})
Returns an error if the transaction fails to begin, commit, or if the provided function returns an error.
type SqliteOption ¶
type SqliteOption func(*Sqlite)
func WithSqliteMigrations ¶ added in v0.5.0
func WithSqliteMigrations(options migrations.Options) SqliteOption
WithSqliteMigrations configures migration behavior for the SQLite event log. Use this to skip automatic migrations or provide a custom logger.