Documentation
¶
Overview ¶
Package dcb implements Dynamic Consistency Boundary (DCB) event sourcing.
DCB is an alternative to traditional stream-per-decider event sourcing. Instead of partitioning events by stream ID, events live in a single stream per bounded context and are tagged with key:value pairs. Queries filter by event type and/or tags, and optimistic concurrency is based on global sequence position rather than per-stream version.
Key differences from traditional ES:
- Events are tagged, not assigned to a stream
- One event can affect multiple entities (no need for sagas in many cases)
- Concurrency control uses append conditions on queries, not stream versions
- Reading uses queries (event type + tags), not stream IDs
See https://dcb.events/ for the full specification.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrAppendConditionFailed is returned when the append condition check finds // matching events after the specified position. ErrAppendConditionFailed = errors.New("dcb: append condition failed — conflicting events found") )
Functions ¶
This section is empty.
Types ¶
type AppendCondition ¶
type AppendCondition struct {
// FailIfEventsMatch is the query to check against. If any events matching
// this query exist after the After position, the append fails.
FailIfEventsMatch Query
// After is the highest position the client was aware of when building
// the decision model. Events at or before this position are ignored
// when checking the condition. Use -1 or 0 to check all events.
After int64
}
AppendCondition enforces consistency during append. If any events matching FailIfEventsMatch exist after the After position, the append is rejected.
type CommandHandler ¶
type CommandHandler[S any, C any] struct { Decider Decider[S, C] Store EventStore }
CommandHandler processes commands using DCB.
type Decider ¶
type Decider[S any, C any] struct { // Query returns the query needed to build the decision model for a command. Query func(cmd C) Query // Decide takes the current state and command, returning events or an error. Decide func(state S, cmd C) ([]Event, error) // Evolve applies a sequenced event to the current state. Evolve func(state S, event SequencedEvent) S // InitialState returns the zero state. InitialState func() S }
Decider is the DCB equivalent of the traditional decider pattern. Instead of working with stream-based deciders, it uses tagged events and queries.
func (Decider[S, C]) Fold ¶
func (d Decider[S, C]) Fold(events []SequencedEvent) S
Fold replays sequenced events from the initial state.
type Event ¶
type Event struct {
// Type identifies the kind of event (e.g., "CourseCapacityChanged").
Type string
// Data is the event payload. Serialization is handled by the store.
Data any
// Tags attach domain metadata for query-based filtering.
Tags []Tag
}
Event is an unsaved event with type, data, and tags.
type EventStore ¶
type EventStore interface {
// Read returns events matching the query, optionally starting from a position.
// Returns the matched events and the highest sequence position seen.
// If no events match, returns an empty slice and position 0.
Read(ctx context.Context, query Query, opts ...ReadOption) ([]SequencedEvent, int64, error)
// Append persists one or more events atomically. If a condition is provided,
// the append fails with ErrAppendConditionFailed if matching events exist
// after the condition's After position.
Append(ctx context.Context, events []Event, condition ...AppendCondition) error
}
EventStore is the interface for a DCB-compliant event store.
type MemoryStore ¶
type MemoryStore struct {
// contains filtered or unexported fields
}
MemoryStore is an in-memory DCB event store, suitable for testing.
func NewMemoryStore ¶
func NewMemoryStore() *MemoryStore
NewMemoryStore creates a new in-memory DCB event store.
func (*MemoryStore) Append ¶
func (s *MemoryStore) Append(_ context.Context, events []Event, condition ...AppendCondition) error
func (*MemoryStore) EventCount ¶
func (s *MemoryStore) EventCount() int
EventCount returns the total number of events in the store.
func (*MemoryStore) Read ¶
func (s *MemoryStore) Read(_ context.Context, query Query, opts ...ReadOption) ([]SequencedEvent, int64, error)
type Query ¶
type Query struct {
// contains filtered or unexported fields
}
Query describes constraints for filtering events. Items are combined with OR.
func QueryAll ¶
func QueryAll() Query
QueryAll creates a query that matches all events in the store.
func (Query) Matches ¶
func (q Query) Matches(event SequencedEvent) bool
Matches returns true if the given event matches this query.
type QueryItem ¶
QueryItem filters events by type and/or tags. An event matches if:
- Its type is in the item's Types list (or Types is empty)
- It has ALL of the item's Tags
func (QueryItem) Matches ¶
func (qi QueryItem) Matches(event SequencedEvent) bool
Matches returns true if the event matches this query item.
type ReadOption ¶
type ReadOption func(*readOptions)
ReadOption configures event reading.
func FromPosition ¶
func FromPosition(pos int64) ReadOption
FromPosition starts reading from the given sequence position (exclusive).
type SQLiteStore ¶
type SQLiteStore struct {
// contains filtered or unexported fields
}
SQLiteStore is a SQLite-backed DCB event store.
func NewSQLiteStore ¶
func NewSQLiteStore(dsn string) (*SQLiteStore, error)
NewSQLiteStore creates a new SQLite DCB event store.
func NewSQLiteStoreFromDB ¶
func NewSQLiteStoreFromDB(db *sql.DB) (*SQLiteStore, error)
NewSQLiteStoreFromDB wraps an existing *sql.DB.
func (*SQLiteStore) Append ¶
func (s *SQLiteStore) Append(ctx context.Context, events []Event, condition ...AppendCondition) error
func (*SQLiteStore) Close ¶
func (s *SQLiteStore) Close() error
Close closes the underlying database connection.
func (*SQLiteStore) Read ¶
func (s *SQLiteStore) Read(ctx context.Context, query Query, opts ...ReadOption) ([]SequencedEvent, int64, error)
type SequencedEvent ¶
type SequencedEvent struct {
// Position is the globally unique, monotonically increasing sequence position.
Position int64
// Type identifies the kind of event.
Type string
// Data is the deserialized event payload.
Data any
// Tags are the domain tags attached to this event.
Tags []Tag
// Timestamp is when the event was persisted.
Timestamp time.Time
}
SequencedEvent is a persisted event with its assigned global sequence position.
func (SequencedEvent) HasTag ¶
func (e SequencedEvent) HasTag(key, value string) bool
HasTag returns true if the event has a tag with the given key and value.