eventsource

package
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2021 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package eventsource provides event store structures and abstractions.

Index

Constants

This section is empty.

Variables

View Source
var File_event_proto protoreflect.FileDescriptor

Functions

This section is empty.

Types

type Aggregate

type Aggregate interface {
	Apply(e *Event) error
	SetBase(base *AggregateBase)
	AggBase() *AggregateBase
	Reset()
}

Aggregate is an interface used for the aggregate models

type AggregateBase

type AggregateBase struct {
	// contains filtered or unexported fields
}

AggregateBase is the base of the aggregate which

func (*AggregateBase) CommittedEvents

func (a *AggregateBase) CommittedEvents() []*Event

CommittedEvents gets the committed event messages.

func (*AggregateBase) DecodeEventAs

func (a *AggregateBase) DecodeEventAs(eventData []byte, eventMsg interface{}) error

DecodeEventAs decodes provided in put eventData into the structure of eventMsg. THe eventMsg is expected to be a pointer to the event msg.

func (*AggregateBase) ID

func (a *AggregateBase) ID() string

ID gets the aggregate identifier.

func (*AggregateBase) LatestCommittedEvent

func (a *AggregateBase) LatestCommittedEvent() (*Event, bool)

LatestCommittedEvent gets the latest committed event message.

func (*AggregateBase) MustLatestCommittedEvent

func (a *AggregateBase) MustLatestCommittedEvent() *Event

MustLatestCommittedEvent gets the latest committed event message or panics.

func (*AggregateBase) Revision

func (a *AggregateBase) Revision() int64

Revision gets aggregate current revision.

func (*AggregateBase) SetEvent

func (a *AggregateBase) SetEvent(eventMsg EventMessage) error

SetEvent sets new event message into given aggregate.

func (*AggregateBase) SetID

func (a *AggregateBase) SetID(id string)

SetID sets aggregate id.

func (*AggregateBase) Timestamp

func (a *AggregateBase) Timestamp() int64

Timestamp gets the aggregate base timestamp.

func (*AggregateBase) Type

func (a *AggregateBase) Type() string

Type gets the aggregate type.

func (*AggregateBase) Version

func (a *AggregateBase) Version() int64

Version gets aggregate version.

type AggregateFactory

type AggregateFactory interface {
	New(aggType string, aggVersion int64) Aggregate
}

type Config

type Config struct {
	BufferSize int
}

Config is the configuration for the eventsource storage.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig sets up the default config for the event store.

func (*Config) Validate

func (c *Config) Validate() error

type Cursor

type Cursor interface {
	OpenChannel(withSnapshot bool) (<-chan *CursorAggregate, error)
}

Cursor is an interface used by the storages that enables listing the aggregates.

type CursorAggregate

type CursorAggregate struct {
	AggregateID string
	Snapshot    *Snapshot
	Events      []*Event
}

CursorAggregate is an aggregate events and snapshot taken by the cursor.

type Event

type Event struct {
	EventId       string `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"`
	EventType     string `protobuf:"bytes,2,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"`
	AggregateType string `protobuf:"bytes,3,opt,name=aggregate_type,json=aggregateType,proto3" json:"aggregate_type,omitempty"`
	AggregateId   string `protobuf:"bytes,4,opt,name=aggregate_id,json=aggregateId,proto3" json:"aggregate_id,omitempty"`
	EventData     []byte `protobuf:"bytes,5,opt,name=event_data,json=eventData,proto3" json:"event_data,omitempty"`
	Timestamp     int64  `protobuf:"varint,6,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
	Revision      int64  `protobuf:"varint,7,opt,name=revision,proto3" json:"revision,omitempty"`
	// contains filtered or unexported fields
}

Event is the event source message model.

func (*Event) Copy

func (x *Event) Copy() *Event

Copy creates a copy of given event.

func (*Event) Descriptor deprecated

func (*Event) Descriptor() ([]byte, []int)

Deprecated: Use Event.ProtoReflect.Descriptor instead.

func (*Event) GetAggregateId

func (x *Event) GetAggregateId() string

func (*Event) GetAggregateType

func (x *Event) GetAggregateType() string

func (*Event) GetEventData

func (x *Event) GetEventData() []byte

func (*Event) GetEventId

func (x *Event) GetEventId() string

func (*Event) GetEventType

func (x *Event) GetEventType() string

func (*Event) GetRevision

func (x *Event) GetRevision() int64

func (*Event) GetTimestamp

func (x *Event) GetTimestamp() int64

func (*Event) ProtoMessage

func (*Event) ProtoMessage()

func (*Event) ProtoReflect

func (x *Event) ProtoReflect() protoreflect.Message

func (*Event) Reset

func (x *Event) Reset()

func (*Event) String

func (x *Event) String() string

func (*Event) Time

func (x *Event) Time() time.Time

type EventCodec added in v0.0.8

type EventCodec codec.Codec

EventCodec is the type wrapper over the codec.Codec used for event encoding in wire injection.

type EventHandler

type EventHandler interface {
	Handle(ctx context.Context, e *Event)
}

EventHandler is an interface used for handling events.

type EventMessage

type EventMessage interface {
	MessageType() string
}

EventMessage is an interface that defines event messages.

type EventStore

type EventStore interface {
	SetAggregateBase(agg Aggregate, aggId, aggType string, version int64)
	LoadEventStream(ctx context.Context, aggregate Aggregate) error
	LoadEventStreamWithSnapshot(ctx context.Context, aggregate Aggregate) error
	Commit(ctx context.Context, aggregate Aggregate) error
	SaveSnapshot(ctx context.Context, aggregate Aggregate) error
	StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error)
	StreamAggregates(ctx context.Context, aggType string, aggVersion int64, factory AggregateFactory) (<-chan Aggregate, error)
	StreamProjections(ctx context.Context, aggType string, aggVersion int64, factory ProjectionFactory) (<-chan Projection, error)
}

EventStore is an interface used by the event store to load, commit and create snapshot on aggregates.

type IdGenerator

type IdGenerator interface {
	GenerateId() string
}

IdGenerator is the interface used by identity generators.

type MockStore

type MockStore struct {
	// contains filtered or unexported fields
}

MockStore is a mock of Store interface.

func NewDefaultMockStore

func NewDefaultMockStore(ctrl *gomock.Controller) *MockStore

NewDefaultMockStore creates new default mock store.

func NewMockStore

func NewMockStore(ctrl *gomock.Controller, eventCodec, snapCodec codec.Codec, idGen IdGenerator) *MockStore

NewMockStore creates a new mock instance.

func (*MockStore) Commit

func (m *MockStore) Commit(arg0 context.Context, arg1 Aggregate) error

Commit mocks base method.

func (*MockStore) EXPECT

func (m *MockStore) EXPECT() *MockStoreMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockStore) LoadEventStream

func (m *MockStore) LoadEventStream(arg0 context.Context, arg1 Aggregate) error

LoadEventStream mocks base method.

func (*MockStore) LoadEventStreamWithSnapshot

func (m *MockStore) LoadEventStreamWithSnapshot(arg0 context.Context, arg1 Aggregate) error

LoadEventStreamWithSnapshot mocks base method.

func (*MockStore) SaveSnapshot

func (m *MockStore) SaveSnapshot(arg0 context.Context, arg1 Aggregate) error

SaveSnapshot mocks base method.

func (MockStore) SetAggregateBase

func (a MockStore) SetAggregateBase(agg Aggregate, aggId, aggType string, version int64)

SetAggregateBase implements AggregateBaseSetter interface.

func (*MockStore) StreamAggregates

func (m *MockStore) StreamAggregates(arg0 context.Context, arg1 string, arg2 int64, arg3 AggregateFactory) (<-chan Aggregate, error)

StreamAggregates mocks base method.

func (*MockStore) StreamEvents

func (m *MockStore) StreamEvents(arg0 context.Context, arg1 *StreamEventsRequest) (<-chan *Event, error)

StreamEvents mocks base method.

func (*MockStore) StreamProjections

func (m *MockStore) StreamProjections(arg0 context.Context, arg1 string, arg2 int64, arg3 ProjectionFactory) (<-chan Projection, error)

StreamProjections mocks base method.

type MockStoreMockRecorder

type MockStoreMockRecorder struct {
	// contains filtered or unexported fields
}

MockStoreMockRecorder is the mock recorder for MockStore.

func (*MockStoreMockRecorder) Commit

func (mr *MockStoreMockRecorder) Commit(arg0, arg1 interface{}) *gomock.Call

Commit indicates an expected call of Commit.

func (*MockStoreMockRecorder) LoadEventStream

func (mr *MockStoreMockRecorder) LoadEventStream(arg0, arg1 interface{}) *gomock.Call

LoadEventStream indicates an expected call of LoadEventStream.

func (*MockStoreMockRecorder) LoadEventStreamWithSnapshot

func (mr *MockStoreMockRecorder) LoadEventStreamWithSnapshot(arg0, arg1 interface{}) *gomock.Call

LoadEventStreamWithSnapshot indicates an expected call of LoadEventStreamWithSnapshot.

func (*MockStoreMockRecorder) SaveSnapshot

func (mr *MockStoreMockRecorder) SaveSnapshot(arg0, arg1 interface{}) *gomock.Call

SaveSnapshot indicates an expected call of SaveSnapshot.

func (*MockStoreMockRecorder) StreamAggregates

func (mr *MockStoreMockRecorder) StreamAggregates(arg0, arg1, arg2, arg3 interface{}) *gomock.Call

StreamAggregates indicates an expected call of StreamAggregates.

func (*MockStoreMockRecorder) StreamEvents

func (mr *MockStoreMockRecorder) StreamEvents(arg0, arg1 interface{}) *gomock.Call

StreamEvents indicates an expected call of StreamEvents.

func (*MockStoreMockRecorder) StreamProjections

func (mr *MockStoreMockRecorder) StreamProjections(arg0, arg1, arg2, arg3 interface{}) *gomock.Call

StreamProjections indicates an expected call of StreamProjections.

type Projection

type Projection interface {
	Apply(c codec.Codec, e *Event) error
}

Projection is an interface used to represent the query projeciton.

type ProjectionFactory

type ProjectionFactory interface {
	NewProjection(id string) Projection
}

ProjectionFactory is an interface used to create new projections.

type Snapshot

type Snapshot struct {
	AggregateId      string `json:"aggregate_id,omitempty"`
	AggregateType    string `json:"aggregate_type,omitempty"`
	AggregateVersion int64  `json:"aggregate_version,omitempty"`
	Revision         int64  `json:"revision,omitempty"`
	Timestamp        int64  `json:"timestamp,omitempty"`
	SnapshotData     []byte `json:"snapshot_data,omitempty"`
}

Snapshot is a structure that define basic fields of the aggregate snapshot.

type SnapshotCodec added in v0.0.8

type SnapshotCodec codec.Codec

SnapshotCodec is the type wrapper over the codec.Codec used for wire injection.

type Storage

type Storage interface {
	SaveEvents(ctx context.Context, es []*Event) error
	GetEventStream(ctx context.Context, aggId string, aggType string) ([]*Event, error)
	SaveSnapshot(ctx context.Context, snap *Snapshot) error
	GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*Snapshot, error)
	GetStreamFromRevision(ctx context.Context, aggId string, aggType string, from int64) ([]*Event, error)
	NewCursor(ctx context.Context, aggType string, aggVersion int64) (Cursor, error)
	StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error)
}

Storage is the interface used by the event store as a storage for its events and snapshots.

type Store added in v0.0.8

type Store struct {
	// contains filtered or unexported fields
}

Store is the default implementation for the EventStore interface.

func New

func New(cfg *Config, eventCodec EventCodec, snapCodec SnapshotCodec, storage Storage) (*Store, error)

New creates new EventStore implementation.

func (*Store) Commit added in v0.0.8

func (e *Store) Commit(ctx context.Context, agg Aggregate) error

Commit commits provided aggregate events. In case when the

func (*Store) LoadEventStream added in v0.0.8

func (e *Store) LoadEventStream(ctx context.Context, agg Aggregate) error

LoadEventStream gets the event stream and applies on provided aggregate.

func (*Store) LoadEventStreamWithSnapshot added in v0.0.8

func (e *Store) LoadEventStreamWithSnapshot(ctx context.Context, agg Aggregate) error

LoadEventStreamWithSnapshot gets the aggregate stream with the latest possible snapshot.

func (*Store) SaveSnapshot added in v0.0.8

func (e *Store) SaveSnapshot(ctx context.Context, agg Aggregate) error

SaveSnapshot stores the snapshot

func (Store) SetAggregateBase added in v0.0.8

func (a Store) SetAggregateBase(agg Aggregate, aggId, aggType string, version int64)

SetAggregateBase implements AggregateBaseSetter interface.

func (*Store) StreamAggregates added in v0.0.8

func (e *Store) StreamAggregates(ctx context.Context, aggType string, aggVersion int64, factory AggregateFactory) (<-chan Aggregate, error)

StreamAggregates opens up the aggregate streaming channel. The channel would got closed when there is no more aggregate to read or when the context is done. Closing resulting channel would result with a panic.

func (*Store) StreamEvents added in v0.0.8

func (e *Store) StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error)

func (*Store) StreamProjections added in v0.0.8

func (e *Store) StreamProjections(ctx context.Context, aggType string, aggVersion int64, factory ProjectionFactory) (<-chan Projection, error)

func (*Store) WithStorage added in v0.0.8

func (e *Store) WithStorage(storage Storage) *Store

WithStorage creates a copy of the Store structure that has a different storage. This function could be used to create transaction implementations.

type StreamEventsRequest

type StreamEventsRequest struct {
	AggregateTypes    []string
	AggregateIDs      []string
	ExcludeEventTypes []string
	EventTypes        []string
	BuffSize          int
}

StreamEventsRequest is a request for the stream events query.

type UUIDGenerator

type UUIDGenerator struct{}

UUIDGenerator implements IdGenerator interface. Generates UUID V4 identifier.

func (UUIDGenerator) GenerateId

func (u UUIDGenerator) GenerateId() string

GenerateId generates identified. Implements IdGenerator interface.

Directories

Path Synopsis
Package mockes is a generated GoMock package.
Package mockes is a generated GoMock package.
sqlxes module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL