Documentation
¶
Overview ¶
Package memory contains an example of a CQRS/ES app using memory as DB.
Example ¶
NOTE: Not named "Integration" to enable running with the unit tests.
// Create the event bus that distributes events.
eventBus := localEventBus.NewEventBus()
go func() {
for e := range eventBus.Errors() {
log.Printf("eventbus: %s", e.Error())
}
}()
// Create the event store.
eventStore, err := memoryEventStore.NewEventStore(
memoryEventStore.WithEventHandler(eventBus), // Add the event bus as a handler after save.
)
if err != nil {
log.Fatalf("could not create event store: %s", err)
}
// Create the command bus.
commandBus := bus.NewCommandHandler()
// Create the read repositories.
invitationRepo := memory.NewRepo()
invitationRepo.SetEntityFactory(func() eh.Entity { return &guestlist.Invitation{} })
guestListRepo := memory.NewRepo()
guestListRepo.SetEntityFactory(func() eh.Entity { return &guestlist.GuestList{} })
ctx := context.Background()
// Setup a test utility waiter that waits for all 11 events to occur before
// evaluating results.
var wg sync.WaitGroup
wg.Add(11)
eventBus.AddHandler(ctx, eh.MatchAll{}, eh.EventHandlerFunc(
func(ctx context.Context, e eh.Event) error {
wg.Done()
return nil
},
))
// Setup the guestlist.
eventID := uuid.New()
guestlist.Setup(
ctx,
eventStore,
eventBus, // Use the event bus both as local and global handler.
eventBus,
commandBus,
invitationRepo, guestListRepo,
eventID,
)
// --- Execute commands on the domain --------------------------------------
// IDs for all the guests.
athenaID := uuid.New()
hadesID := uuid.New()
zeusID := uuid.New()
poseidonID := uuid.New()
// Issue some invitations and responses. Error checking omitted here.
if err := commandBus.HandleCommand(ctx, &guestlist.CreateInvite{ID: athenaID, Name: "Athena", Age: 42}); err != nil {
log.Println("error:", err)
}
if err := commandBus.HandleCommand(ctx, &guestlist.CreateInvite{ID: hadesID, Name: "Hades"}); err != nil {
log.Println("error:", err)
}
if err := commandBus.HandleCommand(ctx, &guestlist.CreateInvite{ID: zeusID, Name: "Zeus"}); err != nil {
log.Println("error:", err)
}
if err := commandBus.HandleCommand(ctx, &guestlist.CreateInvite{ID: poseidonID, Name: "Poseidon"}); err != nil {
log.Println("error:", err)
}
// The invited guests accept and decline the event.
// Note that Athena tries to decline the event after first accepting, but
// that is not allowed by the domain logic in InvitationAggregate. The
// result is that she is still accepted.
if err := commandBus.HandleCommand(ctx, &guestlist.AcceptInvite{ID: athenaID}); err != nil {
log.Println("error:", err)
}
if err := commandBus.HandleCommand(ctx, &guestlist.DeclineInvite{ID: athenaID}); err != nil {
// NOTE: This error is supposed to be printed!
log.Printf("error: %s\n", err)
}
if err := commandBus.HandleCommand(ctx, &guestlist.AcceptInvite{ID: hadesID}); err != nil {
log.Println("error:", err)
}
if err := commandBus.HandleCommand(ctx, &guestlist.DeclineInvite{ID: zeusID}); err != nil {
log.Println("error:", err)
}
// Poseidon is a bit late to the party...
if err := commandBus.HandleCommand(ctx, &guestlist.AcceptInvite{ID: poseidonID}); err != nil {
log.Println("error:", err)
}
// Wait for simulated eventual consistency before reading.
wg.Wait()
time.Sleep(100 * time.Millisecond)
// Read all invites.
invitationStrs := []string{}
invitations, err := invitationRepo.FindAll(ctx)
if err != nil {
log.Println("error:", err)
}
for _, i := range invitations {
if i, ok := i.(*guestlist.Invitation); ok {
invitationStrs = append(invitationStrs, fmt.Sprintf("%s - %s", i.Name, i.Status))
}
}
// Sort the output to be able to compare test results.
sort.Strings(invitationStrs)
for _, s := range invitationStrs {
log.Printf("invitation: %s\n", s)
fmt.Printf("invitation: %s\n", s)
}
// Read the guest list.
guestList, err := guestListRepo.Find(ctx, eventID)
if err != nil {
log.Println("error:", err)
}
if l, ok := guestList.(*guestlist.GuestList); ok {
log.Printf("guest list: %d invited - %d accepted, %d declined - %d confirmed, %d denied\n",
l.NumGuests, l.NumAccepted, l.NumDeclined, l.NumConfirmed, l.NumDenied)
fmt.Printf("guest list: %d invited - %d accepted, %d declined - %d confirmed, %d denied\n",
l.NumGuests, l.NumAccepted, l.NumDeclined, l.NumConfirmed, l.NumDenied)
}
if err := eventBus.Close(); err != nil {
log.Println("error closing event bus:", err)
}
if err := invitationRepo.Close(); err != nil {
log.Println("error closing invitation repo:", err)
}
if err := guestListRepo.Close(); err != nil {
log.Println("error closing guest list repo:", err)
}
if err := eventStore.Close(); err != nil {
log.Println("error closing event store:", err)
}
Output: invitation: Athena - confirmed invitation: Hades - confirmed invitation: Poseidon - denied invitation: Zeus - declined guest list: 4 invited - 3 accepted, 1 declined - 2 confirmed, 1 denied
Click to show internal directories.
Click to hide internal directories.