Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func New ¶
func New(ctx context.Context, events <-chan event.Event, opts ...Option) (<-chan aggregate.History, <-chan error)
New creates a stream that converts events into aggregate Histories. It takes a channel of events and returns both a channel of aggregate Histories and an error channel.
Each History can be applied to an aggregate to build its current state.
Example usage:
var events <-chan event.Event
histories, errs := stream.New(ctx, events)
// Process results
for history := range histories {
foo := newFoo(history.Aggregate().ID)
history.Apply(foo)
// foo now contains the current state
}
func NewOf ¶ added in v0.1.2
func NewOf[D any, Event event.Of[D]](ctx context.Context, events <-chan Event, opts ...Option) (<-chan aggregate.History, <-chan error)
NewOf creates a typed stream that converts events into aggregate Histories. It works the same as New but accepts a typed event channel.
Each History can be applied to an aggregate to build its current state.
Example usage:
var events <-chan event.Of[MyData]
histories, errs := stream.NewOf(ctx, events)
// Process results
for history := range histories {
foo := newFoo(history.Aggregate().ID)
history.Apply(foo)
// foo now contains the current state
}
Types ¶
type Option ¶
type Option func(*options)
Option is a stream option.
func Errors ¶
Errors returns an Option that provides a Stream with error channels. A Stream will cancel its operation as soon as an error can be received from one of the error channels.
func Filter ¶
Filter returns an Option that filters incoming events before processing. Events are passed through each filter function in order. If any filter returns false, the event is discarded.
func Grouped ¶
Grouped returns an Option that optimizes aggregate builds by informing the Stream about the grouping of incoming events.
When Grouped is disabled (default), the Stream must wait for the input stream to close before returning aggregates, as it cannot determine when all events for a specific aggregate have been received.
When Grouped is enabled, the Stream assumes events are grouped by aggregate (name, ID) and can return each aggregate as soon as it receives the last event for that aggregate.
Enable this option only if the input stream guarantees that events are sequentially grouped by aggregate. Events within each group can be in any order unless Sorted is also enabled.
Example of correctly grouped events:
// Group 1: foo/BB... name="foo" id="BB..." version=2 name="foo" id="BB..." version=1 name="foo" id="BB..." version=4 name="foo" id="BB..." version=3 // Group 2: bar/AA... name="bar" id="AA..." version=1 name="bar" id="AA..." version=2 // Group 3: foo/AA... (different ID) name="foo" id="AA..." version=1 name="foo" id="AA..." version=2
func Sorted ¶
Sorted returns an Option that optimizes aggregate builds by informing the Stream about the sort order of incoming events.
When Sorted is disabled (default), the Stream sorts the collected events for each aggregate by AggregateVersion before applying them.
Enable this option only if the input event stream guarantees that events are already sorted by AggregateVersion for each aggregate.
func ValidateConsistency ¶
ValidateConsistency returns an Option that controls whether event consistency is validated before building aggregates.
When enabled (default), the Stream validates that events form a consistent sequence (no gaps in version numbers, correct aggregate references, etc.) before applying them to build an aggregate.
Disable this option only if event consistency is guaranteed by the source or if you explicitly want to allow aggregates in potentially invalid states.
func WithSoftDeleted ¶ added in v0.1.2
WithSoftDeleted returns an Option that controls whether soft-deleted aggregates are included in the output stream.
By default (false), soft-deleted aggregates are excluded from results. Set to true to include soft-deleted aggregates in the output.