stream

package
v0.6.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 20, 2025 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func New

func New(ctx context.Context, events <-chan event.Event, opts ...Option) (<-chan aggregate.History, <-chan error)

New creates a stream that converts events into aggregate Histories. It takes a channel of events and returns both a channel of aggregate Histories and an error channel.

Each History can be applied to an aggregate to build its current state.

Example usage:

var events <-chan event.Event
histories, errs := stream.New(ctx, events)

// Process results
for history := range histories {
	foo := newFoo(history.Aggregate().ID)
	history.Apply(foo)
	// foo now contains the current state
}

func NewOf added in v0.1.2

func NewOf[D any, Event event.Of[D]](ctx context.Context, events <-chan Event, opts ...Option) (<-chan aggregate.History, <-chan error)

NewOf creates a typed stream that converts events into aggregate Histories. It works the same as New but accepts a typed event channel.

Each History can be applied to an aggregate to build its current state.

Example usage:

var events <-chan event.Of[MyData]
histories, errs := stream.NewOf(ctx, events)

// Process results
for history := range histories {
	foo := newFoo(history.Aggregate().ID)
	history.Apply(foo)
	// foo now contains the current state
}

Types

type Option

type Option func(*options)

Option is a stream option.

func Errors

func Errors(errs ...<-chan error) Option

Errors returns an Option that provides a Stream with error channels. A Stream will cancel its operation as soon as an error can be received from one of the error channels.

func Filter

func Filter(fns ...func(event.Event) bool) Option

Filter returns an Option that filters incoming events before processing. Events are passed through each filter function in order. If any filter returns false, the event is discarded.

func Grouped

func Grouped(v bool) Option

Grouped returns an Option that optimizes aggregate builds by informing the Stream about the grouping of incoming events.

When Grouped is disabled (default), the Stream must wait for the input stream to close before returning aggregates, as it cannot determine when all events for a specific aggregate have been received.

When Grouped is enabled, the Stream assumes events are grouped by aggregate (name, ID) and can return each aggregate as soon as it receives the last event for that aggregate.

Enable this option only if the input stream guarantees that events are sequentially grouped by aggregate. Events within each group can be in any order unless Sorted is also enabled.

Example of correctly grouped events:

// Group 1: foo/BB...
name="foo" id="BB..." version=2
name="foo" id="BB..." version=1
name="foo" id="BB..." version=4
name="foo" id="BB..." version=3
// Group 2: bar/AA...
name="bar" id="AA..." version=1
name="bar" id="AA..." version=2
// Group 3: foo/AA... (different ID)
name="foo" id="AA..." version=1
name="foo" id="AA..." version=2

func Sorted

func Sorted(v bool) Option

Sorted returns an Option that optimizes aggregate builds by informing the Stream about the sort order of incoming events.

When Sorted is disabled (default), the Stream sorts the collected events for each aggregate by AggregateVersion before applying them.

Enable this option only if the input event stream guarantees that events are already sorted by AggregateVersion for each aggregate.

func ValidateConsistency

func ValidateConsistency(v bool) Option

ValidateConsistency returns an Option that controls whether event consistency is validated before building aggregates.

When enabled (default), the Stream validates that events form a consistent sequence (no gaps in version numbers, correct aggregate references, etc.) before applying them to build an aggregate.

Disable this option only if event consistency is guaranteed by the source or if you explicitly want to allow aggregates in potentially invalid states.

func WithSoftDeleted added in v0.1.2

func WithSoftDeleted(v bool) Option

WithSoftDeleted returns an Option that controls whether soft-deleted aggregates are included in the output stream.

By default (false), soft-deleted aggregates are excluded from results. Set to true to include soft-deleted aggregates in the output.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL