dcb

package
v0.0.0-...-17eb08c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: MIT Imports: 8 Imported by: 0

Documentation

Overview

Package dcb implements Dynamic Consistency Boundary (DCB) event sourcing.

DCB is an alternative to traditional stream-per-decider event sourcing. Instead of partitioning events by stream ID, events live in a single stream per bounded context and are tagged with key:value pairs. Queries filter by event type and/or tags, and optimistic concurrency is based on global sequence position rather than per-stream version.

Key differences from traditional ES:

  • Events are tagged, not assigned to a stream
  • One event can affect multiple entities (no need for sagas in many cases)
  • Concurrency control uses append conditions on queries, not stream versions
  • Reading uses queries (event type + tags), not stream IDs

See https://dcb.events/ for the full specification.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrAppendConditionFailed is returned when the append condition check finds
	// matching events after the specified position.
	ErrAppendConditionFailed = errors.New("dcb: append condition failed — conflicting events found")
)

Functions

This section is empty.

Types

type AppendCondition

type AppendCondition struct {
	// FailIfEventsMatch is the query to check against. If any events matching
	// this query exist after the After position, the append fails.
	FailIfEventsMatch Query

	// After is the highest position the client was aware of when building
	// the decision model. Events at or before this position are ignored
	// when checking the condition. Use -1 or 0 to check all events.
	After int64
}

AppendCondition enforces consistency during append. If any events matching FailIfEventsMatch exist after the After position, the append is rejected.

type CommandHandler

type CommandHandler[S any, C any] struct {
	Decider Decider[S, C]
	Store   EventStore
}

CommandHandler processes commands using DCB.

func (*CommandHandler[S, C]) Handle

func (h *CommandHandler[S, C]) Handle(ctx context.Context, cmd C) (S, error)

Handle executes a command: reads relevant events, builds state, decides, appends.

type Decider

type Decider[S any, C any] struct {
	// Query returns the query needed to build the decision model for a command.
	Query func(cmd C) Query

	// Decide takes the current state and command, returning events or an error.
	Decide func(state S, cmd C) ([]Event, error)

	// Evolve applies a sequenced event to the current state.
	Evolve func(state S, event SequencedEvent) S

	// InitialState returns the zero state.
	InitialState func() S
}

Decider is the DCB equivalent of the traditional decider pattern. Instead of working with stream-based deciders, it uses tagged events and queries.

func (Decider[S, C]) Fold

func (d Decider[S, C]) Fold(events []SequencedEvent) S

Fold replays sequenced events from the initial state.

type Event

type Event struct {
	// Type identifies the kind of event (e.g., "CourseCapacityChanged").
	Type string

	// Data is the event payload. Serialization is handled by the store.
	Data any

	// Tags attach domain metadata for query-based filtering.
	Tags []Tag
}

Event is an unsaved event with type, data, and tags.

type EventStore

type EventStore interface {
	// Read returns events matching the query, optionally starting from a position.
	// Returns the matched events and the highest sequence position seen.
	// If no events match, returns an empty slice and position 0.
	Read(ctx context.Context, query Query, opts ...ReadOption) ([]SequencedEvent, int64, error)

	// Append persists one or more events atomically. If a condition is provided,
	// the append fails with ErrAppendConditionFailed if matching events exist
	// after the condition's After position.
	Append(ctx context.Context, events []Event, condition ...AppendCondition) error
}

EventStore is the interface for a DCB-compliant event store.

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore is an in-memory DCB event store, suitable for testing.

func NewMemoryStore

func NewMemoryStore() *MemoryStore

NewMemoryStore creates a new in-memory DCB event store.

func (*MemoryStore) Append

func (s *MemoryStore) Append(_ context.Context, events []Event, condition ...AppendCondition) error

func (*MemoryStore) EventCount

func (s *MemoryStore) EventCount() int

EventCount returns the total number of events in the store.

func (*MemoryStore) Read

func (s *MemoryStore) Read(_ context.Context, query Query, opts ...ReadOption) ([]SequencedEvent, int64, error)

type Query

type Query struct {
	// contains filtered or unexported fields
}

Query describes constraints for filtering events. Items are combined with OR.

func NewQuery

func NewQuery(items ...QueryItem) Query

NewQuery creates a query from one or more items (combined with OR).

func QueryAll

func QueryAll() Query

QueryAll creates a query that matches all events in the store.

func (Query) IsAll

func (q Query) IsAll() bool

IsAll returns true if this query matches all events.

func (Query) Items

func (q Query) Items() []QueryItem

Items returns the query items. Empty if this is an "all" query.

func (Query) Matches

func (q Query) Matches(event SequencedEvent) bool

Matches returns true if the given event matches this query.

type QueryItem

type QueryItem struct {
	Types []string
	Tags  []Tag
}

QueryItem filters events by type and/or tags. An event matches if:

  • Its type is in the item's Types list (or Types is empty)
  • It has ALL of the item's Tags

func (QueryItem) Matches

func (qi QueryItem) Matches(event SequencedEvent) bool

Matches returns true if the event matches this query item.

type ReadOption

type ReadOption func(*readOptions)

ReadOption configures event reading.

func FromPosition

func FromPosition(pos int64) ReadOption

FromPosition starts reading from the given sequence position (exclusive).

type SQLiteStore

type SQLiteStore struct {
	// contains filtered or unexported fields
}

SQLiteStore is a SQLite-backed DCB event store.

func NewSQLiteStore

func NewSQLiteStore(dsn string) (*SQLiteStore, error)

NewSQLiteStore creates a new SQLite DCB event store.

func NewSQLiteStoreFromDB

func NewSQLiteStoreFromDB(db *sql.DB) (*SQLiteStore, error)

NewSQLiteStoreFromDB wraps an existing *sql.DB.

func (*SQLiteStore) Append

func (s *SQLiteStore) Append(ctx context.Context, events []Event, condition ...AppendCondition) error

func (*SQLiteStore) Close

func (s *SQLiteStore) Close() error

Close closes the underlying database connection.

func (*SQLiteStore) Read

func (s *SQLiteStore) Read(ctx context.Context, query Query, opts ...ReadOption) ([]SequencedEvent, int64, error)

type SequencedEvent

type SequencedEvent struct {
	// Position is the globally unique, monotonically increasing sequence position.
	Position int64

	// Type identifies the kind of event.
	Type string

	// Data is the deserialized event payload.
	Data any

	// Tags are the domain tags attached to this event.
	Tags []Tag

	// Timestamp is when the event was persisted.
	Timestamp time.Time
}

SequencedEvent is a persisted event with its assigned global sequence position.

func (SequencedEvent) HasTag

func (e SequencedEvent) HasTag(key, value string) bool

HasTag returns true if the event has a tag with the given key and value.

type Tag

type Tag struct {
	Key   string
	Value string
}

Tag is a key:value pair attached to an event for domain-specific partitioning. Examples: "course:c1", "student:s42", "order:ord-123".

func (Tag) String

func (t Tag) String() string

String returns the tag in "key:value" format.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL