Documentation
¶
Overview ¶
Package eventsource provides event store structures and abstractions.
Index ¶
- Variables
- type Aggregate
- type AggregateBase
- func (a *AggregateBase) CommittedEvents() []*Event
- func (a *AggregateBase) DecodeEventAs(eventData []byte, eventMsg interface{}) error
- func (a *AggregateBase) ID() string
- func (a *AggregateBase) LatestCommittedEvent() (*Event, bool)
- func (a *AggregateBase) MustLatestCommittedEvent() *Event
- func (a *AggregateBase) Revision() int64
- func (a *AggregateBase) SetEvent(eventMsg EventMessage) error
- func (a *AggregateBase) SetID(id string)
- func (a *AggregateBase) Timestamp() int64
- func (a *AggregateBase) Type() string
- func (a *AggregateBase) Version() int64
- type AggregateFactory
- type Config
- type Cursor
- type CursorAggregate
- type Event
- func (x *Event) Copy() *Event
- func (*Event) Descriptor() ([]byte, []int)deprecated
- func (x *Event) GetAggregateId() string
- func (x *Event) GetAggregateType() string
- func (x *Event) GetEventData() []byte
- func (x *Event) GetEventId() string
- func (x *Event) GetEventType() string
- func (x *Event) GetRevision() int64
- func (x *Event) GetTimestamp() int64
- func (*Event) ProtoMessage()
- func (x *Event) ProtoReflect() protoreflect.Message
- func (x *Event) Reset()
- func (x *Event) String() string
- func (x *Event) Time() time.Time
- type EventHandler
- type EventMessage
- type EventStore
- type IdGenerator
- type MockStore
- func (m *MockStore) Commit(arg0 context.Context, arg1 Aggregate) error
- func (m *MockStore) EXPECT() *MockStoreMockRecorder
- func (m *MockStore) LoadEventStream(arg0 context.Context, arg1 Aggregate) error
- func (m *MockStore) LoadEventStreamWithSnapshot(arg0 context.Context, arg1 Aggregate) error
- func (m *MockStore) SaveSnapshot(arg0 context.Context, arg1 Aggregate) error
- func (a MockStore) SetAggregateBase(agg Aggregate, aggId, aggType string, version int64)
- func (m *MockStore) StreamAggregates(arg0 context.Context, arg1 string, arg2 int64, arg3 AggregateFactory) (<-chan Aggregate, error)
- func (m *MockStore) StreamEvents(arg0 context.Context, arg1 *StreamEventsRequest) (<-chan *Event, error)
- func (m *MockStore) StreamProjections(arg0 context.Context, arg1 string, arg2 int64, arg3 ProjectionFactory) (<-chan Projection, error)
- type MockStoreMockRecorder
- func (mr *MockStoreMockRecorder) Commit(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockStoreMockRecorder) LoadEventStream(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockStoreMockRecorder) LoadEventStreamWithSnapshot(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockStoreMockRecorder) SaveSnapshot(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockStoreMockRecorder) StreamAggregates(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
- func (mr *MockStoreMockRecorder) StreamEvents(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockStoreMockRecorder) StreamProjections(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
- type Projection
- type ProjectionFactory
- type Snapshot
- type Storage
- type StreamEventsRequest
- type UUIDGenerator
Constants ¶
This section is empty.
Variables ¶
var File_event_proto protoreflect.FileDescriptor
Functions ¶
This section is empty.
Types ¶
type Aggregate ¶
type Aggregate interface {
Apply(e *Event) error
SetBase(base *AggregateBase)
AggBase() *AggregateBase
Reset()
}
Aggregate is an interface used for the aggregate models
type AggregateBase ¶
type AggregateBase struct {
// contains filtered or unexported fields
}
AggregateBase is the base of the aggregate which
func (*AggregateBase) CommittedEvents ¶
func (a *AggregateBase) CommittedEvents() []*Event
CommittedEvents gets the committed event messages.
func (*AggregateBase) DecodeEventAs ¶
func (a *AggregateBase) DecodeEventAs(eventData []byte, eventMsg interface{}) error
DecodeEventAs decodes provided in put eventData into the structure of eventMsg. THe eventMsg is expected to be a pointer to the event msg.
func (*AggregateBase) LatestCommittedEvent ¶
func (a *AggregateBase) LatestCommittedEvent() (*Event, bool)
LatestCommittedEvent gets the latest committed event message.
func (*AggregateBase) MustLatestCommittedEvent ¶
func (a *AggregateBase) MustLatestCommittedEvent() *Event
MustLatestCommittedEvent gets the latest committed event message or panics.
func (*AggregateBase) Revision ¶
func (a *AggregateBase) Revision() int64
Revision gets aggregate current revision.
func (*AggregateBase) SetEvent ¶
func (a *AggregateBase) SetEvent(eventMsg EventMessage) error
SetEvent sets new event message into given aggregate.
func (*AggregateBase) Timestamp ¶
func (a *AggregateBase) Timestamp() int64
Timestamp gets the aggregate base timestamp.
func (*AggregateBase) Version ¶
func (a *AggregateBase) Version() int64
Version gets aggregate version.
type AggregateFactory ¶
type Config ¶
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig sets up the default config for the event store.
type Cursor ¶
type Cursor interface {
OpenChannel(withSnapshot bool) (<-chan *CursorAggregate, error)
}
Cursor is an interface used by the storages that enables listing the aggregates.
type CursorAggregate ¶
CursorAggregate is an aggregate events and snapshot taken by the cursor.
type Event ¶
type Event struct {
EventId string `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"`
EventType string `protobuf:"bytes,2,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"`
AggregateType string `protobuf:"bytes,3,opt,name=aggregate_type,json=aggregateType,proto3" json:"aggregate_type,omitempty"`
AggregateId string `protobuf:"bytes,4,opt,name=aggregate_id,json=aggregateId,proto3" json:"aggregate_id,omitempty"`
EventData []byte `protobuf:"bytes,5,opt,name=event_data,json=eventData,proto3" json:"event_data,omitempty"`
Timestamp int64 `protobuf:"varint,6,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
Revision int64 `protobuf:"varint,7,opt,name=revision,proto3" json:"revision,omitempty"`
// contains filtered or unexported fields
}
Event is the event source message model.
func (*Event) Descriptor
deprecated
func (*Event) GetAggregateId ¶
func (*Event) GetAggregateType ¶
func (*Event) GetEventData ¶
func (*Event) GetEventId ¶
func (*Event) GetEventType ¶
func (*Event) GetRevision ¶
func (*Event) GetTimestamp ¶
func (*Event) ProtoMessage ¶
func (*Event) ProtoMessage()
func (*Event) ProtoReflect ¶
func (x *Event) ProtoReflect() protoreflect.Message
type EventHandler ¶
EventHandler is an interface used for handling events.
type EventMessage ¶
type EventMessage interface {
MessageType() string
}
EventMessage is an interface that defines event messages.
type EventStore ¶
type EventStore interface {
SetAggregateBase(agg Aggregate, aggId, aggType string, version int64)
LoadEventStream(ctx context.Context, aggregate Aggregate) error
LoadEventStreamWithSnapshot(ctx context.Context, aggregate Aggregate) error
Commit(ctx context.Context, aggregate Aggregate) error
SaveSnapshot(ctx context.Context, aggregate Aggregate) error
StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error)
StreamAggregates(ctx context.Context, aggType string, aggVersion int64, factory AggregateFactory) (<-chan Aggregate, error)
StreamProjections(ctx context.Context, aggType string, aggVersion int64, factory ProjectionFactory) (<-chan Projection, error)
}
EventStore is an interface used by the event store to load, commit and create snapshot on aggregates.
type IdGenerator ¶
type IdGenerator interface {
GenerateId() string
}
IdGenerator is the interface used by identity generators.
type MockStore ¶
type MockStore struct {
// contains filtered or unexported fields
}
MockStore is a mock of Store interface.
func NewDefaultMockStore ¶
func NewDefaultMockStore(ctrl *gomock.Controller) *MockStore
NewDefaultMockStore creates new default mock store.
func NewMockStore ¶
func NewMockStore(ctrl *gomock.Controller, eventCodec, snapCodec codec.Codec, idGen IdGenerator) *MockStore
NewMockStore creates a new mock instance.
func (*MockStore) EXPECT ¶
func (m *MockStore) EXPECT() *MockStoreMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockStore) LoadEventStream ¶
LoadEventStream mocks base method.
func (*MockStore) LoadEventStreamWithSnapshot ¶
LoadEventStreamWithSnapshot mocks base method.
func (*MockStore) SaveSnapshot ¶
SaveSnapshot mocks base method.
func (MockStore) SetAggregateBase ¶
SetAggregateBase implements AggregateBaseSetter interface.
func (*MockStore) StreamAggregates ¶
func (m *MockStore) StreamAggregates(arg0 context.Context, arg1 string, arg2 int64, arg3 AggregateFactory) (<-chan Aggregate, error)
StreamAggregates mocks base method.
func (*MockStore) StreamEvents ¶
func (m *MockStore) StreamEvents(arg0 context.Context, arg1 *StreamEventsRequest) (<-chan *Event, error)
StreamEvents mocks base method.
func (*MockStore) StreamProjections ¶
func (m *MockStore) StreamProjections(arg0 context.Context, arg1 string, arg2 int64, arg3 ProjectionFactory) (<-chan Projection, error)
StreamProjections mocks base method.
type MockStoreMockRecorder ¶
type MockStoreMockRecorder struct {
// contains filtered or unexported fields
}
MockStoreMockRecorder is the mock recorder for MockStore.
func (*MockStoreMockRecorder) Commit ¶
func (mr *MockStoreMockRecorder) Commit(arg0, arg1 interface{}) *gomock.Call
Commit indicates an expected call of Commit.
func (*MockStoreMockRecorder) LoadEventStream ¶
func (mr *MockStoreMockRecorder) LoadEventStream(arg0, arg1 interface{}) *gomock.Call
LoadEventStream indicates an expected call of LoadEventStream.
func (*MockStoreMockRecorder) LoadEventStreamWithSnapshot ¶
func (mr *MockStoreMockRecorder) LoadEventStreamWithSnapshot(arg0, arg1 interface{}) *gomock.Call
LoadEventStreamWithSnapshot indicates an expected call of LoadEventStreamWithSnapshot.
func (*MockStoreMockRecorder) SaveSnapshot ¶
func (mr *MockStoreMockRecorder) SaveSnapshot(arg0, arg1 interface{}) *gomock.Call
SaveSnapshot indicates an expected call of SaveSnapshot.
func (*MockStoreMockRecorder) StreamAggregates ¶
func (mr *MockStoreMockRecorder) StreamAggregates(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
StreamAggregates indicates an expected call of StreamAggregates.
func (*MockStoreMockRecorder) StreamEvents ¶
func (mr *MockStoreMockRecorder) StreamEvents(arg0, arg1 interface{}) *gomock.Call
StreamEvents indicates an expected call of StreamEvents.
func (*MockStoreMockRecorder) StreamProjections ¶
func (mr *MockStoreMockRecorder) StreamProjections(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
StreamProjections indicates an expected call of StreamProjections.
type Projection ¶
Projection is an interface used to represent the query projeciton.
type ProjectionFactory ¶
type ProjectionFactory interface {
NewProjection(id string) Projection
}
ProjectionFactory is an interface used to create new projections.
type Snapshot ¶
type Snapshot struct {
AggregateId string `json:"aggregate_id,omitempty"`
AggregateType string `json:"aggregate_type,omitempty"`
AggregateVersion int64 `json:"aggregate_version,omitempty"`
Revision int64 `json:"revision,omitempty"`
Timestamp int64 `json:"timestamp,omitempty"`
SnapshotData []byte `json:"snapshot_data,omitempty"`
}
Snapshot is a structure that define basic fields of the aggregate snapshot.
type Storage ¶
type Storage interface {
SaveEvents(ctx context.Context, es []*Event) error
GetEventStream(ctx context.Context, aggId string, aggType string) ([]*Event, error)
SaveSnapshot(ctx context.Context, snap *Snapshot) error
GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*Snapshot, error)
GetStreamFromRevision(ctx context.Context, aggId string, aggType string, from int64) ([]*Event, error)
NewCursor(ctx context.Context, aggType string, aggVersion int64) (Cursor, error)
StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error)
}
Storage is the interface used by the event store as a storage for its events and snapshots.
type StreamEventsRequest ¶
type UUIDGenerator ¶
type UUIDGenerator struct{}
UUIDGenerator implements IdGenerator interface. Generates UUID V4 identifier.
func (UUIDGenerator) GenerateId ¶
func (u UUIDGenerator) GenerateId() string
GenerateId generates identified. Implements IdGenerator interface.