goengine

package module
v0.3.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 7, 2021 License: MIT Imports: 9 Imported by: 4

README

hellofresh/engine

Build Status

Welcome to HelloFresh GoEngine!!

GoEngine provides you all the capabilities to build an Event sourced application in go. This was based on the initial project Engine for PHP

Components

Engine is divided in a few small independent components.

Install

go get -u github.com/hellofresh/goengine

Usage

Here you can check a small tutorial of how to use this component in an orders scenario.

Tutorial

Logging

GoEngine uses default log package for debug logging. If you want to use your own logger - goengine.SetLogHandler() is available. Here is how you can use, e.g. github.com/sirupsen/logrus for logging:

package main

import (
    "github.com/hellofresh/goengine"
    log "github.com/sirupsen/logrus"
)

func main() {
    goengine.SetLogHandler(func(msg string, fields map[string]interface{}, err error) {
        if nil == fields && nil == err {
            log.Debug(msg)
        } else {
            var entry *log.Entry
            if fields != nil {
                entry = log.WithFields(log.Fields(fields))
                if nil != err {
                    entry = entry.WithError(err)
                }
            } else {
                entry = log.WithError(err)
            }

            entry.Debug(msg)
        }
    })

    // do your application stuff
}

Contributing

Please see CONTRIBUTING for details.

License

The MIT License (MIT). Please see License File for more information.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Log added in v0.3.6

func Log(msg string, fields map[string]interface{}, err error)

Log ...

func SetLogHandler added in v0.3.6

func SetLogHandler(handler LogHandler)

SetLogHandler ...

Types

type AggregateRepository added in v0.3.6

type AggregateRepository interface {
	Load(string, StreamName) (*EventStream, error)
	Save(AggregateRoot, StreamName) error
	Reconstitute(string, AggregateRoot, StreamName) error
}

AggregateRepository ...

type AggregateRoot added in v0.3.6

type AggregateRoot interface {
	GetID() string
	GetVersion() int
	SetVersion(int)
	Apply(DomainEvent)
	GetUncommittedEvents() []*DomainMessage
}

AggregateRoot ...

type AggregateRootBased added in v0.3.6

type AggregateRootBased struct {
	ID string
	// contains filtered or unexported fields
}

AggregateRootBased ...

func NewAggregateRootBased added in v0.3.6

func NewAggregateRootBased(source interface{}) *AggregateRootBased

NewAggregateRootBased ...

func NewEventSourceBasedWithID added in v0.3.6

func NewEventSourceBasedWithID(source interface{}, id string) *AggregateRootBased

NewEventSourceBasedWithID ...

func (*AggregateRootBased) Apply added in v0.3.6

func (r *AggregateRootBased) Apply(event DomainEvent)

Apply ...

func (*AggregateRootBased) GetID added in v0.3.6

func (r *AggregateRootBased) GetID() string

GetID ...

func (*AggregateRootBased) GetUncommittedEvents added in v0.3.6

func (r *AggregateRootBased) GetUncommittedEvents() []*DomainMessage

GetUncommittedEvents ...

func (*AggregateRootBased) GetVersion added in v0.3.6

func (r *AggregateRootBased) GetVersion() int

GetVersion ...

func (*AggregateRootBased) Record added in v0.3.6

func (r *AggregateRootBased) Record(event DomainEvent)

Record ...

func (*AggregateRootBased) RecordThat added in v0.3.6

func (r *AggregateRootBased) RecordThat(event DomainEvent)

RecordThat ...

func (*AggregateRootBased) SetVersion added in v0.3.6

func (r *AggregateRootBased) SetVersion(version int)

SetVersion ...

type DomainEvent added in v0.3.6

type DomainEvent interface {
	OccurredOn() time.Time
}

DomainEvent ...

type DomainMessage added in v0.3.6

type DomainMessage struct {
	ID         string      `json:"aggregate_id,omitempty"`
	Version    int         `json:"version"`
	Payload    DomainEvent `json:"payload"`
	RecordedOn time.Time   `json:"recorded_on"`
}

DomainMessage ...

func NewDomainMessage added in v0.3.6

func NewDomainMessage(id string, version int, payload DomainEvent, recordedOn time.Time) *DomainMessage

NewDomainMessage ...

func RecordNow added in v0.3.6

func RecordNow(id string, version int, payload DomainEvent) *DomainMessage

RecordNow ...

func (*DomainMessage) String added in v0.3.6

func (dm *DomainMessage) String() string

String ...

type EventStore

type EventStore interface {
	Append(events *EventStream) error
	GetEventsFor(streamName StreamName, id string) (*EventStream, error)
	FromVersion(streamName StreamName, id string, version int) (*EventStream, error)
	CountEventsFor(streamName StreamName, id string) (int64, error)
}

EventStore ...

type EventStream

type EventStream struct {
	Name   StreamName
	Events []*DomainMessage
}

EventStream ...

func NewEventStream added in v0.3.6

func NewEventStream(name StreamName, events []*DomainMessage) *EventStream

NewEventStream ...

type InMemoryTypeRegistry added in v0.3.6

type InMemoryTypeRegistry struct {
	// contains filtered or unexported fields
}

InMemoryTypeRegistry implements the in memory strategy for the registry

func NewInMemoryTypeRegistry added in v0.3.6

func NewInMemoryTypeRegistry() *InMemoryTypeRegistry

NewInMemoryTypeRegistry creates a new in memory registry

func (*InMemoryTypeRegistry) Get added in v0.3.6

func (r *InMemoryTypeRegistry) Get(name string) (interface{}, error)

Get retrieves a reflect.Type based on a name

func (*InMemoryTypeRegistry) GetTypeByName added in v0.3.6

func (r *InMemoryTypeRegistry) GetTypeByName(typeName string) (reflect.Type, bool)

GetTypeByName ...

func (*InMemoryTypeRegistry) RegisterAggregate added in v0.3.6

func (r *InMemoryTypeRegistry) RegisterAggregate(aggregate AggregateRoot, events ...interface{})

RegisterAggregate ...

func (*InMemoryTypeRegistry) RegisterEvents added in v0.3.6

func (r *InMemoryTypeRegistry) RegisterEvents(events ...interface{})

RegisterEvents ...

func (*InMemoryTypeRegistry) RegisterType added in v0.3.6

func (r *InMemoryTypeRegistry) RegisterType(i interface{})

RegisterType adds a type in the registry

type LogHandler added in v0.3.6

type LogHandler func(msg string, fields map[string]interface{}, err error)

LogHandler ...

type MapBasedVersionedEventDispatcher added in v0.3.6

type MapBasedVersionedEventDispatcher struct {
	// contains filtered or unexported fields
}

MapBasedVersionedEventDispatcher is a simple implementation of the versioned event dispatcher. Using a map it registered event handlers to event types

func NewVersionedEventDispatcher added in v0.3.6

func NewVersionedEventDispatcher() *MapBasedVersionedEventDispatcher

NewVersionedEventDispatcher is a constructor for the MapBasedVersionedEventDispatcher

func (*MapBasedVersionedEventDispatcher) DispatchEvent added in v0.3.6

func (m *MapBasedVersionedEventDispatcher) DispatchEvent(event *DomainMessage) error

DispatchEvent executes all event handlers registered for the given event type

func (*MapBasedVersionedEventDispatcher) RegisterEventHandler added in v0.3.6

func (m *MapBasedVersionedEventDispatcher) RegisterEventHandler(event interface{}, handler VersionedEventHandler)

RegisterEventHandler allows a caller to register an event handler given an event of the specified type being received

func (*MapBasedVersionedEventDispatcher) RegisterGlobalHandler added in v0.3.6

func (m *MapBasedVersionedEventDispatcher) RegisterGlobalHandler(handler VersionedEventHandler)

RegisterGlobalHandler allows a caller to register a wildcard event handler call on any event received

type PublisherRepository added in v0.3.6

type PublisherRepository struct {
	EventStore EventStore
	EventBus   VersionedEventPublisher
}

PublisherRepository ...

func NewPublisherRepository added in v0.3.6

func NewPublisherRepository(eventStore EventStore, eventBus VersionedEventPublisher) *PublisherRepository

NewPublisherRepository ...

func (*PublisherRepository) Load added in v0.3.6

func (r *PublisherRepository) Load(id string, streamName StreamName) (*EventStream, error)

Load ...

func (*PublisherRepository) Reconstitute added in v0.3.6

func (r *PublisherRepository) Reconstitute(id string, source AggregateRoot, streamName StreamName) error

Reconstitute ...

func (*PublisherRepository) Save added in v0.3.6

func (r *PublisherRepository) Save(aggregateRoot AggregateRoot, streamName StreamName) error

Save ...

type StreamName

type StreamName string

StreamName ...

type TypeRegistry added in v0.3.6

type TypeRegistry interface {
	GetTypeByName(string) (reflect.Type, bool)
	RegisterAggregate(AggregateRoot, ...interface{})
	RegisterEvents(...interface{})
	RegisterType(interface{})
	Get(string) (interface{}, error)
}

TypeRegistry is a registry for go types this is necessary since we can't create a type from a string and it's json. With this registry we can know how to create a type for that string

type VersionedEventDispatchManager added in v0.3.6

type VersionedEventDispatchManager struct {
	// contains filtered or unexported fields
}

VersionedEventDispatchManager is responsible for coordinating receiving messages from event receivers and dispatching them to the event dispatcher.

func NewVersionedEventDispatchManager added in v0.3.6

func NewVersionedEventDispatchManager(receiver VersionedEventReceiver, registry TypeRegistry) *VersionedEventDispatchManager

NewVersionedEventDispatchManager is a constructor for the VersionedEventDispatchManager

func (*VersionedEventDispatchManager) Listen added in v0.3.6

func (m *VersionedEventDispatchManager) Listen(stop <-chan bool, exclusive bool) error

Listen starts a listen loop processing channels related to new incoming events, errors and stop listening requests

func (*VersionedEventDispatchManager) RegisterEventHandler added in v0.3.6

func (m *VersionedEventDispatchManager) RegisterEventHandler(event interface{}, handler VersionedEventHandler)

RegisterEventHandler allows a caller to register an event handler given an event of the specified type being received

func (*VersionedEventDispatchManager) RegisterGlobalHandler added in v0.3.6

func (m *VersionedEventDispatchManager) RegisterGlobalHandler(handler VersionedEventHandler)

RegisterGlobalHandler allows a caller to register a wildcard event handler call on any event received

type VersionedEventDispatcher added in v0.3.6

type VersionedEventDispatcher interface {
	DispatchEvent(*DomainMessage) error
	RegisterEventHandler(event interface{}, handler VersionedEventHandler)
	RegisterGlobalHandler(handler VersionedEventHandler)
}

VersionedEventDispatcher is responsible for routing events from the event manager to call handlers responsible for processing received events

type VersionedEventHandler added in v0.3.6

type VersionedEventHandler func(*DomainMessage) error

VersionedEventHandler is a function that takes a versioned event

type VersionedEventPublisher added in v0.3.6

type VersionedEventPublisher interface {
	PublishEvents([]*DomainMessage) error
}

VersionedEventPublisher is responsible for publishing events that have been saved to the event store\repository

type VersionedEventReceiver added in v0.3.6

type VersionedEventReceiver interface {
	ReceiveEvents(VersionedEventReceiverOptions) error
}

VersionedEventReceiver is responsible for receiving globally published events

type VersionedEventReceiverOptions added in v0.3.6

type VersionedEventReceiverOptions struct {
	TypeRegistry TypeRegistry
	Close        chan chan error
	Error        chan error
	ReceiveEvent chan VersionedEventTransactedAccept
	Exclusive    bool
}

VersionedEventReceiverOptions is an initalization structure to communicate to and from an event receiver go routine

type VersionedEventTransactedAccept added in v0.3.6

type VersionedEventTransactedAccept struct {
	Event                 *DomainMessage
	ProcessedSuccessfully chan bool
}

VersionedEventTransactedAccept is the message routed from an event receiver to the event manager. Sometimes event receivers designed with reliable delivery require acknowledgements after a message has been received. The success channel here allows for such acknowledgements

Directories

Path Synopsis
cmd
goengine command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL