eventsource

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2021 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package eventsource provides event store structures and abstractions.

Index

Constants

This section is empty.

Variables

View Source
var File_event_proto protoreflect.FileDescriptor

Functions

This section is empty.

Types

type Aggregate

type Aggregate interface {
	Apply(e *Event) error
	SetBase(base *AggregateBase)
	AggBase() *AggregateBase
	Reset()
}

Aggregate is an interface used for the aggregate models

type AggregateBase

type AggregateBase struct {
	// contains filtered or unexported fields
}

AggregateBase is the base of the aggregate which

func (*AggregateBase) CommittedEvents

func (a *AggregateBase) CommittedEvents() []*Event

CommittedEvents gets the committed event messages.

func (*AggregateBase) DecodeEventAs

func (a *AggregateBase) DecodeEventAs(eventData []byte, eventMsg interface{}) error

DecodeEventAs decodes provided in put eventData into the structure of eventMsg. THe eventMsg is expected to be a pointer to the event msg.

func (*AggregateBase) ID

func (a *AggregateBase) ID() string

ID gets the aggregate identifier.

func (*AggregateBase) LatestCommittedEvent

func (a *AggregateBase) LatestCommittedEvent() (*Event, bool)

LatestCommittedEvent gets the latest committed event message.

func (*AggregateBase) MustLatestCommittedEvent

func (a *AggregateBase) MustLatestCommittedEvent() *Event

MustLatestCommittedEvent gets the latest committed event message or panics.

func (*AggregateBase) Revision

func (a *AggregateBase) Revision() int64

Revision gets aggregate current revision.

func (*AggregateBase) SetEvent

func (a *AggregateBase) SetEvent(eventMsg EventMessage) error

SetEvent sets new event message into given aggregate.

func (*AggregateBase) SetID

func (a *AggregateBase) SetID(id string)

SetID sets aggregate id.

func (*AggregateBase) Timestamp

func (a *AggregateBase) Timestamp() int64

Timestamp gets the aggregate base timestamp.

func (*AggregateBase) Type

func (a *AggregateBase) Type() string

Type gets the aggregate type.

func (*AggregateBase) Version

func (a *AggregateBase) Version() int64

Version gets aggregate version.

type AggregateFactory

type AggregateFactory interface {
	New(aggType string, aggVersion int64) Aggregate
}

type Config

type Config struct {
	BufferSize    int
	EventCodec    codec.Codec
	SnapshotCodec codec.Codec
}

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig sets up the default config for the event store.

func (*Config) Validate

func (c *Config) Validate() error

type Cursor

type Cursor interface {
	OpenChannel(withSnapshot bool) (<-chan *CursorAggregate, error)
}

Cursor is an interface used by the storages that enables listing the aggregates.

type CursorAggregate

type CursorAggregate struct {
	AggregateID string
	Snapshot    *Snapshot
	Events      []*Event
}

CursorAggregate is an aggregate events and snapshot taken by the cursor.

type Event

type Event struct {
	EventId       string `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"`
	EventType     string `protobuf:"bytes,2,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"`
	AggregateType string `protobuf:"bytes,3,opt,name=aggregate_type,json=aggregateType,proto3" json:"aggregate_type,omitempty"`
	AggregateId   string `protobuf:"bytes,4,opt,name=aggregate_id,json=aggregateId,proto3" json:"aggregate_id,omitempty"`
	EventData     []byte `protobuf:"bytes,5,opt,name=event_data,json=eventData,proto3" json:"event_data,omitempty"`
	Timestamp     int64  `protobuf:"varint,6,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
	Revision      int64  `protobuf:"varint,7,opt,name=revision,proto3" json:"revision,omitempty"`
	// contains filtered or unexported fields
}

Event is the event source message model.

func (*Event) Copy

func (x *Event) Copy() *Event

Copy creates a copy of given event.

func (*Event) Descriptor deprecated

func (*Event) Descriptor() ([]byte, []int)

Deprecated: Use Event.ProtoReflect.Descriptor instead.

func (*Event) GetAggregateId

func (x *Event) GetAggregateId() string

func (*Event) GetAggregateType

func (x *Event) GetAggregateType() string

func (*Event) GetEventData

func (x *Event) GetEventData() []byte

func (*Event) GetEventId

func (x *Event) GetEventId() string

func (*Event) GetEventType

func (x *Event) GetEventType() string

func (*Event) GetRevision

func (x *Event) GetRevision() int64

func (*Event) GetTimestamp

func (x *Event) GetTimestamp() int64

func (*Event) ProtoMessage

func (*Event) ProtoMessage()

func (*Event) ProtoReflect

func (x *Event) ProtoReflect() protoreflect.Message

func (*Event) Reset

func (x *Event) Reset()

func (*Event) String

func (x *Event) String() string

func (*Event) Time

func (x *Event) Time() time.Time

type EventHandler

type EventHandler interface {
	Handle(ctx context.Context, e *Event)
}

EventHandler is an interface used for handling events.

type EventMessage

type EventMessage interface {
	MessageType() string
}

EventMessage is an interface that defines event messages.

type EventStore

type EventStore interface {
	SetAggregateBase(agg Aggregate, aggId, aggType string, version int64)
	LoadEventStream(ctx context.Context, aggregate Aggregate) error
	LoadEventStreamWithSnapshot(ctx context.Context, aggregate Aggregate) error
	Commit(ctx context.Context, aggregate Aggregate) error
	SaveSnapshot(ctx context.Context, aggregate Aggregate) error
	StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error)
	StreamAggregates(ctx context.Context, aggType string, aggVersion int64, factory AggregateFactory) (<-chan Aggregate, error)
	StreamProjections(ctx context.Context, aggType string, aggVersion int64, factory ProjectionFactory) (<-chan Projection, error)
}

EventStore is an interface used by the event store to load, commit and create snapshot on aggregates.

func New

func New(cfg *Config, storage Storage) (EventStore, error)

New creates new EventStore implementation.

type IdGenerator

type IdGenerator interface {
	GenerateId() string
}

IdGenerator is the interface used by identity generators.

type MockStore

type MockStore struct {
	// contains filtered or unexported fields
}

MockStore is a mock of Store interface.

func NewDefaultMockStore

func NewDefaultMockStore(ctrl *gomock.Controller) *MockStore

NewDefaultMockStore creates new default mock store.

func NewMockStore

func NewMockStore(ctrl *gomock.Controller, eventCodec, snapCodec codec.Codec, idGen IdGenerator) *MockStore

NewMockStore creates a new mock instance.

func (*MockStore) Commit

func (m *MockStore) Commit(arg0 context.Context, arg1 Aggregate) error

Commit mocks base method.

func (*MockStore) EXPECT

func (m *MockStore) EXPECT() *MockStoreMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockStore) LoadEventStream

func (m *MockStore) LoadEventStream(arg0 context.Context, arg1 Aggregate) error

LoadEventStream mocks base method.

func (*MockStore) LoadEventStreamWithSnapshot

func (m *MockStore) LoadEventStreamWithSnapshot(arg0 context.Context, arg1 Aggregate) error

LoadEventStreamWithSnapshot mocks base method.

func (*MockStore) SaveSnapshot

func (m *MockStore) SaveSnapshot(arg0 context.Context, arg1 Aggregate) error

SaveSnapshot mocks base method.

func (MockStore) SetAggregateBase

func (a MockStore) SetAggregateBase(agg Aggregate, aggId, aggType string, version int64)

SetAggregateBase implements AggregateBaseSetter interface.

func (*MockStore) StreamAggregates

func (m *MockStore) StreamAggregates(arg0 context.Context, arg1 string, arg2 int64, arg3 AggregateFactory) (<-chan Aggregate, error)

StreamAggregates mocks base method.

func (*MockStore) StreamEvents

func (m *MockStore) StreamEvents(arg0 context.Context, arg1 *StreamEventsRequest) (<-chan *Event, error)

StreamEvents mocks base method.

func (*MockStore) StreamProjections

func (m *MockStore) StreamProjections(arg0 context.Context, arg1 string, arg2 int64, arg3 ProjectionFactory) (<-chan Projection, error)

StreamProjections mocks base method.

type MockStoreMockRecorder

type MockStoreMockRecorder struct {
	// contains filtered or unexported fields
}

MockStoreMockRecorder is the mock recorder for MockStore.

func (*MockStoreMockRecorder) Commit

func (mr *MockStoreMockRecorder) Commit(arg0, arg1 interface{}) *gomock.Call

Commit indicates an expected call of Commit.

func (*MockStoreMockRecorder) LoadEventStream

func (mr *MockStoreMockRecorder) LoadEventStream(arg0, arg1 interface{}) *gomock.Call

LoadEventStream indicates an expected call of LoadEventStream.

func (*MockStoreMockRecorder) LoadEventStreamWithSnapshot

func (mr *MockStoreMockRecorder) LoadEventStreamWithSnapshot(arg0, arg1 interface{}) *gomock.Call

LoadEventStreamWithSnapshot indicates an expected call of LoadEventStreamWithSnapshot.

func (*MockStoreMockRecorder) SaveSnapshot

func (mr *MockStoreMockRecorder) SaveSnapshot(arg0, arg1 interface{}) *gomock.Call

SaveSnapshot indicates an expected call of SaveSnapshot.

func (*MockStoreMockRecorder) StreamAggregates

func (mr *MockStoreMockRecorder) StreamAggregates(arg0, arg1, arg2, arg3 interface{}) *gomock.Call

StreamAggregates indicates an expected call of StreamAggregates.

func (*MockStoreMockRecorder) StreamEvents

func (mr *MockStoreMockRecorder) StreamEvents(arg0, arg1 interface{}) *gomock.Call

StreamEvents indicates an expected call of StreamEvents.

func (*MockStoreMockRecorder) StreamProjections

func (mr *MockStoreMockRecorder) StreamProjections(arg0, arg1, arg2, arg3 interface{}) *gomock.Call

StreamProjections indicates an expected call of StreamProjections.

type Projection

type Projection interface {
	Apply(c codec.Codec, e *Event) error
}

Projection is an interface used to represent the query projeciton.

type ProjectionFactory

type ProjectionFactory interface {
	NewProjection(id string) Projection
}

ProjectionFactory is an interface used to create new projections.

type Snapshot

type Snapshot struct {
	AggregateId      string `json:"aggregate_id,omitempty"`
	AggregateType    string `json:"aggregate_type,omitempty"`
	AggregateVersion int64  `json:"aggregate_version,omitempty"`
	Revision         int64  `json:"revision,omitempty"`
	Timestamp        int64  `json:"timestamp,omitempty"`
	SnapshotData     []byte `json:"snapshot_data,omitempty"`
}

Snapshot is a structure that define basic fields of the aggregate snapshot.

type Storage

type Storage interface {
	SaveEvents(ctx context.Context, es []*Event) error
	GetEventStream(ctx context.Context, aggId string, aggType string) ([]*Event, error)
	SaveSnapshot(ctx context.Context, snap *Snapshot) error
	GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*Snapshot, error)
	GetStreamFromRevision(ctx context.Context, aggId string, aggType string, from int64) ([]*Event, error)
	NewCursor(ctx context.Context, aggType string, aggVersion int64) (Cursor, error)
	StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error)
}

Storage is the interface used by the event store as a storage for its events and snapshots.

type StreamEventsRequest

type StreamEventsRequest struct {
	AggregateTypes    []string
	AggregateIDs      []string
	ExcludeEventTypes []string
	EventTypes        []string
	BuffSize          int
}

type UUIDGenerator

type UUIDGenerator struct{}

UUIDGenerator implements IdGenerator interface. Generates UUID V4 identifier.

func (UUIDGenerator) GenerateId

func (u UUIDGenerator) GenerateId() string

GenerateId generates identified. Implements IdGenerator interface.

Directories

Path Synopsis
Package mockes is a generated GoMock package.
Package mockes is a generated GoMock package.
sqlxes module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL