Documentation
¶
Overview ¶
Example ¶
package main
import (
"context"
"fmt"
"time"
)
const (
Packed = "packed"
PickedUp = "picked-up"
Shipped = "shipped"
)
type ShipmentPacked struct{ EventSkeleton }
type ShipmentPickedUp struct{ EventSkeleton }
type ShipmentShipped struct{ EventSkeleton }
type Shipment struct {
AggregateRootBase
Status string
}
func (s *Shipment) Handle(ctx context.Context, v interface{}) error {
var e Event
switch cmd := v.(type) {
case PackShipment:
e = &ShipmentPacked{EventSkeleton{
At: time.Now(),
ID: cmd.AggregateID(),
}}
case PickupShipment:
e = &ShipmentPickedUp{EventSkeleton{
At: time.Now(),
ID: cmd.AggregateID(),
}}
case ShipShipment:
e = &ShipmentShipped{EventSkeleton{
At: time.Now(),
ID: cmd.AggregateID(),
}}
default:
return fmt.Errorf("unexpected command %T", cmd)
}
return s.Apply(s, e, true)
}
func (s *Shipment) On(e Event) error {
switch v := e.(type) {
case *ShipmentPacked:
s.ID = v.AggregateID()
s.Status = Packed
case *ShipmentPickedUp:
s.ID = v.AggregateID()
s.Status = PickedUp
case *ShipmentShipped:
s.ID = v.AggregateID()
s.Status = Shipped
default:
return fmt.Errorf("unexpected event %T", e)
}
return nil
}
type PackShipment struct{ Command }
type PickupShipment struct{ Command }
type ShipShipment struct{ Command }
func (PackShipment) New() bool { return true }
func main() {
marshaler := new(JsonEventMarshaler)
marshaler.Bind(ShipmentPacked{}, ShipmentPickedUp{}, ShipmentShipped{})
var (
ctx = context.Background()
shipment = &Shipment{}
repo = NewRepository(shipment, WithMarshaler(marshaler))
bus = NewCommandBus(repo)
)
bus.Send(ctx, PackShipment{Command{ID: "abc123"}})
aggregate, _ := repo.GetByID(ctx, "abc123")
shipment, _ = aggregate.(*Shipment)
fmt.Println(shipment.Status)
fmt.Println(shipment.GetVersion())
// Output: packed
// Output: 0
bus.Send(ctx, PickupShipment{Command{ID: shipment.AggregateRootID()}})
aggregate, _ = repo.GetByID(ctx, shipment.AggregateRootID())
shipment, _ = aggregate.(*Shipment)
fmt.Println(shipment.Status)
fmt.Println(shipment.GetVersion())
// Output: picked-up
Output: shipped Output: 2
Index ¶
- Variables
- func NewCommandBus(repo AggregateRootRepository, middlewares ...command.Middleware) command.CommandSender
- type AggregateCommand
- type AggregateHandler
- type AggregateRepository
- func (r *AggregateRepository) GetByID(ctx context.Context, aggrID string) (AggregateRoot, error)
- func (r *AggregateRepository) LoadFromSnap(ctx context.Context, aggrID string) (SnapshottingBehaviour, error)
- func (r *AggregateRepository) New() interface{}
- func (r *AggregateRepository) Save(ctx context.Context, aggr AggregateRoot) error
- type AggregateRoot
- type AggregateRootBase
- func (aggr *AggregateRootBase) AggregateRootID() string
- func (aggr *AggregateRootBase) Apply(aggregate AggregateRoot, e Event, isNew bool) error
- func (aggr *AggregateRootBase) CommitEvents()
- func (aggr *AggregateRootBase) GetUncommitedEvents() []Event
- func (aggr *AggregateRootBase) GetVersion() int
- func (aggr *AggregateRootBase) LoadFromHistory(aggregate AggregateRoot, history []Event) error
- func (i *AggregateRootBase) StreamSize() int
- type AggregateRootRepository
- type AggregateRootSnapshotRepository
- type Command
- type Constructor
- type EpochMillis
- type Event
- type EventMarshaler
- type EventModel
- type EventSkeleton
- type EventStore
- type History
- type JsonEventMarshaler
- type JsonSnapshotMarshaler
- type Option
- type Snapshot
- type SnapshotMarshaler
- type SnapshotModel
- type SnapshotRepository
- type SnapshotSkeleton
- type SnapshotStore
- type SnapshottingBehaviour
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrSnapNotFound = errors.New("snapshot not found")
Functions ¶
func NewCommandBus ¶
func NewCommandBus(repo AggregateRootRepository, middlewares ...command.Middleware) command.CommandSender
Types ¶
type AggregateCommand ¶
type AggregateCommand interface {
AggregateID() string
}
AggregateCommand confirms that the command must have aggregate id which can be helpful to replay aggreate events from event store to build aggregate current state.
type AggregateHandler ¶
type AggregateRepository ¶
type AggregateRepository struct {
Marshaler EventMarshaler
// contains filtered or unexported fields
}
func (*AggregateRepository) GetByID ¶
func (r *AggregateRepository) GetByID(ctx context.Context, aggrID string) (AggregateRoot, error)
func (*AggregateRepository) LoadFromSnap ¶
func (r *AggregateRepository) LoadFromSnap(ctx context.Context, aggrID string) (SnapshottingBehaviour, error)
func (*AggregateRepository) New ¶
func (r *AggregateRepository) New() interface{}
func (*AggregateRepository) Save ¶
func (r *AggregateRepository) Save(ctx context.Context, aggr AggregateRoot) error
type AggregateRoot ¶
type AggregateRootBase ¶
func (*AggregateRootBase) AggregateRootID ¶
func (aggr *AggregateRootBase) AggregateRootID() string
func (*AggregateRootBase) Apply ¶
func (aggr *AggregateRootBase) Apply(aggregate AggregateRoot, e Event, isNew bool) error
func (*AggregateRootBase) CommitEvents ¶
func (aggr *AggregateRootBase) CommitEvents()
func (*AggregateRootBase) GetUncommitedEvents ¶
func (aggr *AggregateRootBase) GetUncommitedEvents() []Event
func (*AggregateRootBase) GetVersion ¶
func (aggr *AggregateRootBase) GetVersion() int
func (*AggregateRootBase) LoadFromHistory ¶
func (aggr *AggregateRootBase) LoadFromHistory(aggregate AggregateRoot, history []Event) error
func (*AggregateRootBase) StreamSize ¶
func (i *AggregateRootBase) StreamSize() int
type AggregateRootRepository ¶
type AggregateRootRepository interface {
New() interface{}
Save(ctx context.Context, agr AggregateRoot) error
GetByID(ctx context.Context, aggregateRootID string) (AggregateRoot, error)
}
func NewRepository ¶
func NewRepository(aggr AggregateRoot, opts ...Option) AggregateRootRepository
type AggregateRootSnapshotRepository ¶
type AggregateRootSnapshotRepository interface {
Save(ctx context.Context, snap Snapshot) error
GetByID(ctx context.Context, aggregateRootID string, version int) (Snapshot, error)
}
func NewSnapRepository ¶
func NewSnapRepository(store SnapshotStore, marshaler SnapshotMarshaler) AggregateRootSnapshotRepository
type Command ¶
type Command struct {
ID string
}
Command is an model for AggregateCommand used to avoid code boilerplate for commands implmenting AggregateCommand
func (Command) AggregateID ¶
type Constructor ¶
type Constructor interface {
New() bool
}
Constructor command implementing this interface can be used to construct new aggregate
type EpochMillis ¶
type EpochMillis int64
func Now ¶
func Now() EpochMillis
func Time ¶
func Time(t time.Time) EpochMillis
func (EpochMillis) Int64 ¶
func (e EpochMillis) Int64() int64
func (EpochMillis) String ¶
func (e EpochMillis) String() string
func (EpochMillis) Time ¶
func (e EpochMillis) Time() time.Time
type EventMarshaler ¶
type EventMarshaler interface {
Bind(e ...Event) error
Marshal(e Event) (EventModel, error)
Unmarshal(e EventModel) (Event, error)
}
type EventModel ¶
type EventModel struct {
// Version is the event version the Data represents
Version int
// At indicates when the event happened; provided as a utility for the store
At EpochMillis
// Data contains the Serializer encoded version of the data
Data []byte
}
EventModel provides the shape of the records to be saved to the db
type EventSkeleton ¶
type EventSkeleton struct {
// ID contains the AggregateID
ID string
// Version holds the event version
Version int
// At contains the event time
At time.Time
}
EventSkeleton provides a default implementation of an Event that is suitable for being embedded
func (EventSkeleton) AggregateID ¶
func (m EventSkeleton) AggregateID() string
AggregateID implements part of the Event interface
func (EventSkeleton) EventAt ¶
func (m EventSkeleton) EventAt() time.Time
EventAt implements part of the Event interface
func (EventSkeleton) EventVersion ¶
func (m EventSkeleton) EventVersion() int
EventVersion implements part of the Event interface
type EventStore ¶
type EventStore interface {
SaveEvents(ctx context.Context, aggrID string, models History, version int) error
GetEventsForAggregate(ctx context.Context, aggrID string, version int) (History, error)
}
func NewInmemEventStore ¶
func NewInmemEventStore() EventStore
type History ¶
type History []EventModel
type JsonEventMarshaler ¶
type JsonEventMarshaler struct {
// contains filtered or unexported fields
}
func (*JsonEventMarshaler) Bind ¶
func (m *JsonEventMarshaler) Bind(events ...Event) error
func (*JsonEventMarshaler) Marshal ¶
func (m *JsonEventMarshaler) Marshal(e Event) (EventModel, error)
func (*JsonEventMarshaler) Unmarshal ¶
func (m *JsonEventMarshaler) Unmarshal(model EventModel) (Event, error)
type JsonSnapshotMarshaler ¶
type JsonSnapshotMarshaler struct {
// contains filtered or unexported fields
}
func (*JsonSnapshotMarshaler) Bind ¶
func (m *JsonSnapshotMarshaler) Bind(states ...interface{}) error
func (*JsonSnapshotMarshaler) Marshal ¶
func (m *JsonSnapshotMarshaler) Marshal(s Snapshot) (SnapshotModel, error)
func (*JsonSnapshotMarshaler) Unmarshal ¶
func (m *JsonSnapshotMarshaler) Unmarshal(model SnapshotModel) (Snapshot, error)
type Option ¶
type Option func(r *AggregateRepository)
func WithDefaultSnapRepository ¶
func WithDefaultSnapRepository(states ...interface{}) Option
func WithEventStore ¶
func WithEventStore(s EventStore) Option
func WithMarshaler ¶
func WithMarshaler(m EventMarshaler) Option
func WithSnapRepository ¶
func WithSnapRepository(s SnapshotStore, m SnapshotMarshaler) Option
type SnapshotMarshaler ¶
type SnapshotMarshaler interface {
Bind(v ...interface{}) error
Marshal(s Snapshot) (SnapshotModel, error)
Unmarshal(m SnapshotModel) (Snapshot, error)
}
type SnapshotModel ¶
type SnapshotRepository ¶
type SnapshotRepository struct {
// contains filtered or unexported fields
}
type SnapshotSkeleton ¶
func (SnapshotSkeleton) AggregateRootID ¶
func (s SnapshotSkeleton) AggregateRootID() string
func (SnapshotSkeleton) CurrentVersion ¶
func (s SnapshotSkeleton) CurrentVersion() int
func (SnapshotSkeleton) GetState ¶
func (s SnapshotSkeleton) GetState() interface{}
type SnapshotStore ¶
type SnapshotStore interface {
SaveSnapshot(ctx context.Context, agrID string, model SnapshotModel, version int) error
GetSnapshotForAggregate(ctx context.Context, agrID string, version int) (SnapshotModel, error)
}
func NewInmemSnapStore ¶
func NewInmemSnapStore() SnapshotStore
type SnapshottingBehaviour ¶
type SnapshottingBehaviour interface {
AggregateRoot
SnapshotInterval() int
GetState() interface{}
ApplyState(s Snapshot)
SnapshottingEnable() bool
}
Directories
¶
| Path | Synopsis |
|---|---|
|
_examples
|
|
|
custom
command
|
|
|
simple
command
|
|
|
snapshotting
command
|
|
|
cmd
|
|
|
playground
command
|
|
|
eventstore
|
|