Documentation
¶
Overview ¶
Package eventsource provides event store structures and abstractions.
Package eventsource is a generated GoMock package.
Index ¶
- Variables
- type Aggregate
- type AggregateBase
- func (a *AggregateBase) CommittedEvents() []*Event
- func (a *AggregateBase) DecodeEventAs(eventData []byte, eventMsg interface{}) error
- func (a *AggregateBase) ID() string
- func (a *AggregateBase) LatestCommittedEvent() (*Event, bool)
- func (a *AggregateBase) MarkEventsCommitted()
- func (a *AggregateBase) MustLatestCommittedEvent() *Event
- func (a *AggregateBase) Revision() int64
- func (a *AggregateBase) SetEvent(eventMsg EventMessage) error
- func (a *AggregateBase) SetID(id string)
- func (a *AggregateBase) Timestamp() int64
- func (a *AggregateBase) Type() string
- func (a *AggregateBase) Version() int64
- type AggregateBaseSetter
- type AggregateFactory
- type Config
- type Cursor
- type CursorAggregate
- type Event
- func (x *Event) Copy() *Event
- func (*Event) Descriptor() ([]byte, []int)deprecated
- func (x *Event) GetAggregateId() string
- func (x *Event) GetAggregateType() string
- func (x *Event) GetEventData() []byte
- func (x *Event) GetEventId() string
- func (x *Event) GetEventType() string
- func (x *Event) GetRevision() int64
- func (x *Event) GetTimestamp() int64
- func (*Event) ProtoMessage()
- func (x *Event) ProtoReflect() protoreflect.Message
- func (x *Event) Reset()
- func (x *Event) String() string
- func (x *Event) Time() time.Time
- type EventCodec
- type EventHandler
- type EventMessage
- type EventStore
- type IdGenerator
- type MockEventStore
- func (m *MockEventStore) Commit(arg0 context.Context, arg1 Aggregate) error
- func (m *MockEventStore) EXPECT() *MockEventStoreMockRecorder
- func (m *MockEventStore) LoadEvents(arg0 context.Context, arg1 Aggregate) error
- func (m *MockEventStore) LoadEventsWithSnapshot(arg0 context.Context, arg1 Aggregate) error
- func (m *MockEventStore) SaveSnapshot(arg0 context.Context, arg1 Aggregate) error
- func (m *MockEventStore) SetAggregateBase(arg0 Aggregate, arg1, arg2 string, arg3 int64)
- func (m *MockEventStore) StreamAggregates(arg0 context.Context, arg1 string, arg2 int64, arg3 AggregateFactory) (<-chan Aggregate, error)
- func (m *MockEventStore) StreamEvents(arg0 context.Context, arg1 *StreamEventsRequest) (<-chan *Event, error)
- func (m *MockEventStore) StreamProjections(arg0 context.Context, arg1 string, arg2 int64, arg3 ProjectionFactory) (<-chan Projection, error)
- type MockEventStoreMockRecorder
- func (mr *MockEventStoreMockRecorder) Commit(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) LoadEvents(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) LoadEventsWithSnapshot(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) SaveSnapshot(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) SetAggregateBase(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) StreamAggregates(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) StreamEvents(arg0, arg1 interface{}) *gomock.Call
- func (mr *MockEventStoreMockRecorder) StreamProjections(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
- type Projection
- type ProjectionFactory
- type Snapshot
- type SnapshotCodec
- type Storage
- type Store
- func (e *Store) Commit(ctx context.Context, agg Aggregate) error
- func (e *Store) CopyWithStorage(storage Storage) *Store
- func (e *Store) LoadEvents(ctx context.Context, agg Aggregate) error
- func (e *Store) LoadEventsWithSnapshot(ctx context.Context, agg Aggregate) error
- func (e *Store) SaveSnapshot(ctx context.Context, agg Aggregate) error
- func (e *Store) StreamAggregates(ctx context.Context, aggType string, aggVersion int64, ...) (<-chan Aggregate, error)
- func (e *Store) StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error)
- func (e *Store) StreamProjections(ctx context.Context, aggType string, aggVersion int64, ...) (<-chan Projection, error)
- type StreamEventsRequest
- type UUIDGenerator
Constants ¶
This section is empty.
Variables ¶
var File_event_proto protoreflect.FileDescriptor
Functions ¶
This section is empty.
Types ¶
type Aggregate ¶
type Aggregate interface {
Apply(e *Event) error
SetBase(base *AggregateBase)
AggBase() *AggregateBase
Reset()
}
Aggregate is an interface used for the aggregate models
type AggregateBase ¶
type AggregateBase struct {
// contains filtered or unexported fields
}
AggregateBase is the base of the aggregate which
func (*AggregateBase) CommittedEvents ¶
func (a *AggregateBase) CommittedEvents() []*Event
CommittedEvents gets the committed event messages.
func (*AggregateBase) DecodeEventAs ¶
func (a *AggregateBase) DecodeEventAs(eventData []byte, eventMsg interface{}) error
DecodeEventAs decodes provided in put eventData into the structure of eventMsg. THe eventMsg is expected to be a pointer to the event msg.
func (*AggregateBase) LatestCommittedEvent ¶
func (a *AggregateBase) LatestCommittedEvent() (*Event, bool)
LatestCommittedEvent gets the latest committed event message.
func (*AggregateBase) MarkEventsCommitted ¶ added in v0.0.12
func (a *AggregateBase) MarkEventsCommitted()
MarkEventsCommitted marks the aggregate events as committed. NOTE: Use this function carefully, as the event store wouldn't try to commit events, already marked as committed.
func (*AggregateBase) MustLatestCommittedEvent ¶
func (a *AggregateBase) MustLatestCommittedEvent() *Event
MustLatestCommittedEvent gets the latest committed event message or panics.
func (*AggregateBase) Revision ¶
func (a *AggregateBase) Revision() int64
Revision gets aggregate current revision.
func (*AggregateBase) SetEvent ¶
func (a *AggregateBase) SetEvent(eventMsg EventMessage) error
SetEvent sets new event message into given aggregate.
func (*AggregateBase) Timestamp ¶
func (a *AggregateBase) Timestamp() int64
Timestamp gets the aggregate base timestamp.
func (*AggregateBase) Version ¶
func (a *AggregateBase) Version() int64
Version gets aggregate version.
type AggregateBaseSetter ¶ added in v0.0.11
type AggregateBaseSetter struct {
// contains filtered or unexported fields
}
AggregateBaseSetter is a structure responsible for setting the aggregate base.
func NewAggregateBaseSetter ¶ added in v0.0.11
func NewAggregateBaseSetter(eventCodec, snapCodec codec.Codec, idGen IdGenerator) *AggregateBaseSetter
NewAggregateBaseSetter creates new aggregate setter.
func (*AggregateBaseSetter) SetAggregateBase ¶ added in v0.0.11
func (a *AggregateBaseSetter) SetAggregateBase(agg Aggregate, aggId, aggType string, version int64)
SetAggregateBase implements AggregateBaseSetter interface.
type AggregateFactory ¶
AggregateFactory is a factory interface used to create new Aggregate models.
type Config ¶
type Config struct {
BufferSize int
}
Config is the configuration for the eventsource storage.
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig sets up the default config for the event store.
type Cursor ¶
type Cursor interface {
// GetAggregateStream gets the stream of aggregates.
GetAggregateStream(withSnapshot bool) (<-chan *CursorAggregate, error)
}
Cursor is an interface used by the storages that enables listing the aggregates.
type CursorAggregate ¶
CursorAggregate is an aggregate events and snapshot taken by the cursor.
type Event ¶
type Event struct {
EventId string `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"`
EventType string `protobuf:"bytes,2,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"`
AggregateType string `protobuf:"bytes,3,opt,name=aggregate_type,json=aggregateType,proto3" json:"aggregate_type,omitempty"`
AggregateId string `protobuf:"bytes,4,opt,name=aggregate_id,json=aggregateId,proto3" json:"aggregate_id,omitempty"`
EventData []byte `protobuf:"bytes,5,opt,name=event_data,json=eventData,proto3" json:"event_data,omitempty"`
Timestamp int64 `protobuf:"varint,6,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
Revision int64 `protobuf:"varint,7,opt,name=revision,proto3" json:"revision,omitempty"`
// contains filtered or unexported fields
}
Event is the event source message model.
func (*Event) Descriptor
deprecated
func (*Event) GetAggregateId ¶
func (*Event) GetAggregateType ¶
func (*Event) GetEventData ¶
func (*Event) GetEventId ¶
func (*Event) GetEventType ¶
func (*Event) GetRevision ¶
func (*Event) GetTimestamp ¶
func (*Event) ProtoMessage ¶
func (*Event) ProtoMessage()
func (*Event) ProtoReflect ¶
func (x *Event) ProtoReflect() protoreflect.Message
type EventCodec ¶ added in v0.0.8
EventCodec is the type wrapper over the codec.Codec used for event encoding in wire injection.
type EventHandler ¶
EventHandler is an interface used for handling events.
type EventMessage ¶
type EventMessage interface {
MessageType() string
}
EventMessage is an interface that defines event messages.
type EventStore ¶
type EventStore interface {
// LoadEvents loads all events for given aggregate.
LoadEvents(ctx context.Context, aggregate Aggregate) error
// LoadEventsWithSnapshot loads the latest snapshot with the events that happened after it.
LoadEventsWithSnapshot(ctx context.Context, aggregate Aggregate) error
// Commit commits the event changes done in given aggregate.
Commit(ctx context.Context, aggregate Aggregate) error
// SaveSnapshot saves the snapshot of given aggregate.
SaveSnapshot(ctx context.Context, aggregate Aggregate) error
// StreamEvents opens stream events that matches given request.
StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error)
// StreamAggregates opens aggregate stream for given type and version.
StreamAggregates(ctx context.Context, aggType string, aggVersion int64, factory AggregateFactory) (<-chan Aggregate, error)
// StreamProjections streams the projections based on given aggregate type and version.
StreamProjections(ctx context.Context, aggType string, aggVersion int64, factory ProjectionFactory) (<-chan Projection, error)
// SetAggregateBase sets the AggregateBase within an aggregate.
SetAggregateBase(agg Aggregate, aggId, aggType string, version int64)
}
EventStore is an interface used by the event store to load, commit and create snapshot on aggregates.
type IdGenerator ¶
type IdGenerator interface {
GenerateId() string
}
IdGenerator is the interface used by identity generators.
type MockEventStore ¶ added in v0.0.11
type MockEventStore struct {
// contains filtered or unexported fields
}
MockEventStore is a mock of EventStore interface.
func NewMockEventStore ¶ added in v0.0.11
func NewMockEventStore(ctrl *gomock.Controller) *MockEventStore
NewMockEventStore creates a new mock instance.
func (*MockEventStore) Commit ¶ added in v0.0.11
func (m *MockEventStore) Commit(arg0 context.Context, arg1 Aggregate) error
Commit mocks base method.
func (*MockEventStore) EXPECT ¶ added in v0.0.11
func (m *MockEventStore) EXPECT() *MockEventStoreMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockEventStore) LoadEvents ¶ added in v0.0.11
func (m *MockEventStore) LoadEvents(arg0 context.Context, arg1 Aggregate) error
LoadEvents mocks base method.
func (*MockEventStore) LoadEventsWithSnapshot ¶ added in v0.0.11
func (m *MockEventStore) LoadEventsWithSnapshot(arg0 context.Context, arg1 Aggregate) error
LoadEventsWithSnapshot mocks base method.
func (*MockEventStore) SaveSnapshot ¶ added in v0.0.11
func (m *MockEventStore) SaveSnapshot(arg0 context.Context, arg1 Aggregate) error
SaveSnapshot mocks base method.
func (*MockEventStore) SetAggregateBase ¶ added in v0.0.11
func (m *MockEventStore) SetAggregateBase(arg0 Aggregate, arg1, arg2 string, arg3 int64)
SetAggregateBase mocks base method.
func (*MockEventStore) StreamAggregates ¶ added in v0.0.11
func (m *MockEventStore) StreamAggregates(arg0 context.Context, arg1 string, arg2 int64, arg3 AggregateFactory) (<-chan Aggregate, error)
StreamAggregates mocks base method.
func (*MockEventStore) StreamEvents ¶ added in v0.0.11
func (m *MockEventStore) StreamEvents(arg0 context.Context, arg1 *StreamEventsRequest) (<-chan *Event, error)
StreamEvents mocks base method.
func (*MockEventStore) StreamProjections ¶ added in v0.0.11
func (m *MockEventStore) StreamProjections(arg0 context.Context, arg1 string, arg2 int64, arg3 ProjectionFactory) (<-chan Projection, error)
StreamProjections mocks base method.
type MockEventStoreMockRecorder ¶ added in v0.0.11
type MockEventStoreMockRecorder struct {
// contains filtered or unexported fields
}
MockEventStoreMockRecorder is the mock recorder for MockEventStore.
func (*MockEventStoreMockRecorder) Commit ¶ added in v0.0.11
func (mr *MockEventStoreMockRecorder) Commit(arg0, arg1 interface{}) *gomock.Call
Commit indicates an expected call of Commit.
func (*MockEventStoreMockRecorder) LoadEvents ¶ added in v0.0.11
func (mr *MockEventStoreMockRecorder) LoadEvents(arg0, arg1 interface{}) *gomock.Call
LoadEvents indicates an expected call of LoadEvents.
func (*MockEventStoreMockRecorder) LoadEventsWithSnapshot ¶ added in v0.0.11
func (mr *MockEventStoreMockRecorder) LoadEventsWithSnapshot(arg0, arg1 interface{}) *gomock.Call
LoadEventsWithSnapshot indicates an expected call of LoadEventsWithSnapshot.
func (*MockEventStoreMockRecorder) SaveSnapshot ¶ added in v0.0.11
func (mr *MockEventStoreMockRecorder) SaveSnapshot(arg0, arg1 interface{}) *gomock.Call
SaveSnapshot indicates an expected call of SaveSnapshot.
func (*MockEventStoreMockRecorder) SetAggregateBase ¶ added in v0.0.11
func (mr *MockEventStoreMockRecorder) SetAggregateBase(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
SetAggregateBase indicates an expected call of SetAggregateBase.
func (*MockEventStoreMockRecorder) StreamAggregates ¶ added in v0.0.11
func (mr *MockEventStoreMockRecorder) StreamAggregates(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
StreamAggregates indicates an expected call of StreamAggregates.
func (*MockEventStoreMockRecorder) StreamEvents ¶ added in v0.0.11
func (mr *MockEventStoreMockRecorder) StreamEvents(arg0, arg1 interface{}) *gomock.Call
StreamEvents indicates an expected call of StreamEvents.
func (*MockEventStoreMockRecorder) StreamProjections ¶ added in v0.0.11
func (mr *MockEventStoreMockRecorder) StreamProjections(arg0, arg1, arg2, arg3 interface{}) *gomock.Call
StreamProjections indicates an expected call of StreamProjections.
type Projection ¶
Projection is an interface used to represent the query projeciton.
type ProjectionFactory ¶
type ProjectionFactory interface {
NewProjection(id string) Projection
}
ProjectionFactory is an interface used to create new projections.
type Snapshot ¶
type Snapshot struct {
AggregateId string `json:"aggregate_id,omitempty"`
AggregateType string `json:"aggregate_type,omitempty"`
AggregateVersion int64 `json:"aggregate_version,omitempty"`
Revision int64 `json:"revision,omitempty"`
Timestamp int64 `json:"timestamp,omitempty"`
SnapshotData []byte `json:"snapshot_data,omitempty"`
}
Snapshot is a structure that define basic fields of the aggregate snapshot.
type SnapshotCodec ¶ added in v0.0.8
SnapshotCodec is the type wrapper over the codec.Codec used for wire injection.
type Storage ¶
type Storage interface {
// SaveEvents all input events atomically in the storage.
SaveEvents(ctx context.Context, es []*Event) error
// ListEvents lists all events for given aggregate type with given id.
ListEvents(ctx context.Context, aggId string, aggType string) ([]*Event, error)
// SaveSnapshot stores a snapshot.
SaveSnapshot(ctx context.Context, snap *Snapshot) error
// GetSnapshot gets the snapshot of the aggregate with it's id, type and version.
GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*Snapshot, error)
// ListEventsFromRevision gets the event stream for given aggregate id, type starting from given revision.
ListEventsFromRevision(ctx context.Context, aggId string, aggType string, from int64) ([]*Event, error)
// NewCursor creates a new cursor of given aggregate type and version.
NewCursor(ctx context.Context, aggType string, aggVersion int64) (Cursor, error)
// StreamEvents streams the events that matching given request.
StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error)
// As allows drivers to expose driver-specific types.
As(dst interface{}) error
}
Storage is the interface used by the event store as a storage for its events and snapshots.
type Store ¶ added in v0.0.8
type Store struct {
*AggregateBaseSetter
// contains filtered or unexported fields
}
Store is the default implementation for the EventStore interface.
func New ¶
func New(cfg *Config, eventCodec EventCodec, snapCodec SnapshotCodec, storage Storage) (*Store, error)
New creates new EventStore implementation.
func (*Store) CopyWithStorage ¶ added in v0.0.9
CopyWithStorage creates a copy of the Store structure that has a different storage. This function could be used to create transaction implementations.
func (*Store) LoadEvents ¶ added in v0.0.9
LoadEvents gets the event stream and applies on provided aggregate.
func (*Store) LoadEventsWithSnapshot ¶ added in v0.0.9
LoadEventsWithSnapshot gets the aggregate stream with the latest possible snapshot.
func (*Store) SaveSnapshot ¶ added in v0.0.8
SaveSnapshot stores the snapshot
func (*Store) StreamAggregates ¶ added in v0.0.8
func (e *Store) StreamAggregates(ctx context.Context, aggType string, aggVersion int64, factory AggregateFactory) (<-chan Aggregate, error)
StreamAggregates opens up the aggregate streaming channel. The channel would got closed when there is no more aggregate to read or when the context is done. Closing resulting channel would result with a panic.
func (*Store) StreamEvents ¶ added in v0.0.8
StreamEvents opens an event stream that matches given request.
func (*Store) StreamProjections ¶ added in v0.0.8
func (e *Store) StreamProjections(ctx context.Context, aggType string, aggVersion int64, factory ProjectionFactory) (<-chan Projection, error)
StreamProjections streams the projection of given aggregate.
type StreamEventsRequest ¶
type StreamEventsRequest struct {
AggregateTypes []string
AggregateIDs []string
ExcludeEventTypes []string
EventTypes []string
BuffSize int
}
StreamEventsRequest is a request for the stream events query.
type UUIDGenerator ¶
type UUIDGenerator struct{}
UUIDGenerator implements IdGenerator interface. Generates UUID V4 identifier.
func (UUIDGenerator) GenerateId ¶
func (u UUIDGenerator) GenerateId() string
GenerateId generates identified. Implements IdGenerator interface.