eventsource

package
v0.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2021 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package eventsource provides event store structures and abstractions.

Index

Constants

This section is empty.

Variables

View Source
var File_event_proto protoreflect.FileDescriptor

Functions

This section is empty.

Types

type Aggregate

type Aggregate interface {
	Apply(e *Event) error
	SetBase(base *AggregateBase)
	AggBase() *AggregateBase
	Reset()
}

Aggregate is an interface used for the aggregate models

type AggregateBase

type AggregateBase struct {
	// contains filtered or unexported fields
}

AggregateBase is the base of the aggregate which

func (*AggregateBase) CommittedEvents

func (a *AggregateBase) CommittedEvents() []*Event

CommittedEvents gets the committed event messages.

func (*AggregateBase) DecodeEventAs

func (a *AggregateBase) DecodeEventAs(eventData []byte, eventMsg interface{}) error

DecodeEventAs decodes provided in put eventData into the structure of eventMsg. THe eventMsg is expected to be a pointer to the event msg.

func (*AggregateBase) ID

func (a *AggregateBase) ID() string

ID gets the aggregate identifier.

func (*AggregateBase) LatestCommittedEvent

func (a *AggregateBase) LatestCommittedEvent() (*Event, bool)

LatestCommittedEvent gets the latest committed event message.

func (*AggregateBase) MustLatestCommittedEvent

func (a *AggregateBase) MustLatestCommittedEvent() *Event

MustLatestCommittedEvent gets the latest committed event message or panics.

func (*AggregateBase) Revision

func (a *AggregateBase) Revision() int64

Revision gets aggregate current revision.

func (*AggregateBase) SetEvent

func (a *AggregateBase) SetEvent(eventMsg EventMessage) error

SetEvent sets new event message into given aggregate.

func (*AggregateBase) SetID

func (a *AggregateBase) SetID(id string)

SetID sets aggregate id.

func (*AggregateBase) Timestamp

func (a *AggregateBase) Timestamp() int64

Timestamp gets the aggregate base timestamp.

func (*AggregateBase) Type

func (a *AggregateBase) Type() string

Type gets the aggregate type.

func (*AggregateBase) Version

func (a *AggregateBase) Version() int64

Version gets aggregate version.

type AggregateFactory

type AggregateFactory interface {
	New(aggType string, aggVersion int64) Aggregate
}

type Config

type Config struct {
	BufferSize int
}

Config is the configuration for the eventsource storage.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig sets up the default config for the event store.

func (*Config) Validate

func (c *Config) Validate() error

type Cursor

type Cursor interface {
	// GetAggregateStream gets the stream of aggregates.
	GetAggregateStream(withSnapshot bool) (<-chan *CursorAggregate, error)
}

Cursor is an interface used by the storages that enables listing the aggregates.

type CursorAggregate

type CursorAggregate struct {
	AggregateID string
	Snapshot    *Snapshot
	Events      []*Event
}

CursorAggregate is an aggregate events and snapshot taken by the cursor.

type Event

type Event struct {
	EventId       string `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"`
	EventType     string `protobuf:"bytes,2,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"`
	AggregateType string `protobuf:"bytes,3,opt,name=aggregate_type,json=aggregateType,proto3" json:"aggregate_type,omitempty"`
	AggregateId   string `protobuf:"bytes,4,opt,name=aggregate_id,json=aggregateId,proto3" json:"aggregate_id,omitempty"`
	EventData     []byte `protobuf:"bytes,5,opt,name=event_data,json=eventData,proto3" json:"event_data,omitempty"`
	Timestamp     int64  `protobuf:"varint,6,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
	Revision      int64  `protobuf:"varint,7,opt,name=revision,proto3" json:"revision,omitempty"`
	// contains filtered or unexported fields
}

Event is the event source message model.

func (*Event) Copy

func (x *Event) Copy() *Event

Copy creates a copy of given event.

func (*Event) Descriptor deprecated

func (*Event) Descriptor() ([]byte, []int)

Deprecated: Use Event.ProtoReflect.Descriptor instead.

func (*Event) GetAggregateId

func (x *Event) GetAggregateId() string

func (*Event) GetAggregateType

func (x *Event) GetAggregateType() string

func (*Event) GetEventData

func (x *Event) GetEventData() []byte

func (*Event) GetEventId

func (x *Event) GetEventId() string

func (*Event) GetEventType

func (x *Event) GetEventType() string

func (*Event) GetRevision

func (x *Event) GetRevision() int64

func (*Event) GetTimestamp

func (x *Event) GetTimestamp() int64

func (*Event) ProtoMessage

func (*Event) ProtoMessage()

func (*Event) ProtoReflect

func (x *Event) ProtoReflect() protoreflect.Message

func (*Event) Reset

func (x *Event) Reset()

func (*Event) String

func (x *Event) String() string

func (*Event) Time

func (x *Event) Time() time.Time

type EventCodec added in v0.0.8

type EventCodec codec.Codec

EventCodec is the type wrapper over the codec.Codec used for event encoding in wire injection.

type EventHandler

type EventHandler interface {
	Handle(ctx context.Context, e *Event)
}

EventHandler is an interface used for handling events.

type EventMessage

type EventMessage interface {
	MessageType() string
}

EventMessage is an interface that defines event messages.

type EventStore

type EventStore interface {
	// LoadEvents loads all events for given aggregate.
	LoadEvents(ctx context.Context, aggregate Aggregate) error

	// LoadEventsWithSnapshot loads the latest snapshot with the events that happened after it.
	LoadEventsWithSnapshot(ctx context.Context, aggregate Aggregate) error

	// Commit commits the event changes done in given aggregate.
	Commit(ctx context.Context, aggregate Aggregate) error

	// SaveSnapshot saves the snapshot of given aggregate.
	SaveSnapshot(ctx context.Context, aggregate Aggregate) error

	// StreamEvents opens stream events that matches given request.
	StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error)

	// StreamAggregates opens aggregate stream for given type and version.
	StreamAggregates(ctx context.Context, aggType string, aggVersion int64, factory AggregateFactory) (<-chan Aggregate, error)

	// StreamProjections streams the projections based on given aggregate type and version.
	StreamProjections(ctx context.Context, aggType string, aggVersion int64, factory ProjectionFactory) (<-chan Projection, error)

	// SetAggregateBase sets the AggregateBase within an aggregate.
	SetAggregateBase(agg Aggregate, aggId, aggType string, version int64)
}

EventStore is an interface used by the event store to load, commit and create snapshot on aggregates.

type IdGenerator

type IdGenerator interface {
	GenerateId() string
}

IdGenerator is the interface used by identity generators.

type MockStore

type MockStore struct {
	// contains filtered or unexported fields
}

MockStore is a mock of Store interface.

func NewDefaultMockStore

func NewDefaultMockStore(ctrl *gomock.Controller) *MockStore

NewDefaultMockStore creates new default mock store.

func NewMockStore

func NewMockStore(ctrl *gomock.Controller, eventCodec, snapCodec codec.Codec, idGen IdGenerator) *MockStore

NewMockStore creates a new mock instance.

func (*MockStore) Commit

func (m *MockStore) Commit(arg0 context.Context, arg1 Aggregate) error

Commit mocks base method.

func (*MockStore) EXPECT

func (m *MockStore) EXPECT() *MockStoreMockRecorder

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockStore) LoadEventStream

func (m *MockStore) LoadEventStream(arg0 context.Context, arg1 Aggregate) error

LoadEventStream mocks base method.

func (*MockStore) LoadEventStreamWithSnapshot

func (m *MockStore) LoadEventStreamWithSnapshot(arg0 context.Context, arg1 Aggregate) error

LoadEventStreamWithSnapshot mocks base method.

func (*MockStore) SaveSnapshot

func (m *MockStore) SaveSnapshot(arg0 context.Context, arg1 Aggregate) error

SaveSnapshot mocks base method.

func (MockStore) SetAggregateBase

func (a MockStore) SetAggregateBase(agg Aggregate, aggId, aggType string, version int64)

SetAggregateBase implements AggregateBaseSetter interface.

func (*MockStore) StreamAggregates

func (m *MockStore) StreamAggregates(arg0 context.Context, arg1 string, arg2 int64, arg3 AggregateFactory) (<-chan Aggregate, error)

StreamAggregates mocks base method.

func (*MockStore) StreamEvents

func (m *MockStore) StreamEvents(arg0 context.Context, arg1 *StreamEventsRequest) (<-chan *Event, error)

StreamEvents mocks base method.

func (*MockStore) StreamProjections

func (m *MockStore) StreamProjections(arg0 context.Context, arg1 string, arg2 int64, arg3 ProjectionFactory) (<-chan Projection, error)

StreamProjections mocks base method.

type MockStoreMockRecorder

type MockStoreMockRecorder struct {
	// contains filtered or unexported fields
}

MockStoreMockRecorder is the mock recorder for MockStore.

func (*MockStoreMockRecorder) Commit

func (mr *MockStoreMockRecorder) Commit(arg0, arg1 interface{}) *gomock.Call

Commit indicates an expected call of Commit.

func (*MockStoreMockRecorder) LoadEventStream

func (mr *MockStoreMockRecorder) LoadEventStream(arg0, arg1 interface{}) *gomock.Call

LoadEventStream indicates an expected call of LoadEventStream.

func (*MockStoreMockRecorder) LoadEventStreamWithSnapshot

func (mr *MockStoreMockRecorder) LoadEventStreamWithSnapshot(arg0, arg1 interface{}) *gomock.Call

LoadEventStreamWithSnapshot indicates an expected call of LoadEventStreamWithSnapshot.

func (*MockStoreMockRecorder) SaveSnapshot

func (mr *MockStoreMockRecorder) SaveSnapshot(arg0, arg1 interface{}) *gomock.Call

SaveSnapshot indicates an expected call of SaveSnapshot.

func (*MockStoreMockRecorder) StreamAggregates

func (mr *MockStoreMockRecorder) StreamAggregates(arg0, arg1, arg2, arg3 interface{}) *gomock.Call

StreamAggregates indicates an expected call of StreamAggregates.

func (*MockStoreMockRecorder) StreamEvents

func (mr *MockStoreMockRecorder) StreamEvents(arg0, arg1 interface{}) *gomock.Call

StreamEvents indicates an expected call of StreamEvents.

func (*MockStoreMockRecorder) StreamProjections

func (mr *MockStoreMockRecorder) StreamProjections(arg0, arg1, arg2, arg3 interface{}) *gomock.Call

StreamProjections indicates an expected call of StreamProjections.

type Projection

type Projection interface {
	Apply(c codec.Codec, e *Event) error
}

Projection is an interface used to represent the query projeciton.

type ProjectionFactory

type ProjectionFactory interface {
	NewProjection(id string) Projection
}

ProjectionFactory is an interface used to create new projections.

type Snapshot

type Snapshot struct {
	AggregateId      string `json:"aggregate_id,omitempty"`
	AggregateType    string `json:"aggregate_type,omitempty"`
	AggregateVersion int64  `json:"aggregate_version,omitempty"`
	Revision         int64  `json:"revision,omitempty"`
	Timestamp        int64  `json:"timestamp,omitempty"`
	SnapshotData     []byte `json:"snapshot_data,omitempty"`
}

Snapshot is a structure that define basic fields of the aggregate snapshot.

type SnapshotCodec added in v0.0.8

type SnapshotCodec codec.Codec

SnapshotCodec is the type wrapper over the codec.Codec used for wire injection.

type Storage

type Storage interface {
	// SaveEvents all input events atomically in the storage.
	SaveEvents(ctx context.Context, es []*Event) error

	// ListEvents lists all events for given aggregate type with given id.
	ListEvents(ctx context.Context, aggId string, aggType string) ([]*Event, error)

	// SaveSnapshot stores a snapshot.
	SaveSnapshot(ctx context.Context, snap *Snapshot) error

	// GetSnapshot gets the snapshot of the aggregate with it's id, type and version.
	GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*Snapshot, error)

	// ListEventsFromRevision gets the event stream for given aggregate id, type starting from given revision.
	ListEventsFromRevision(ctx context.Context, aggId string, aggType string, from int64) ([]*Event, error)

	// NewCursor creates a new cursor of given aggregate type and version.
	NewCursor(ctx context.Context, aggType string, aggVersion int64) (Cursor, error)

	// StreamEvents streams the events that matching given request.
	StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error)

	// As allows drivers to expose driver-specific types.
	As(dst interface{}) error
}

Storage is the interface used by the event store as a storage for its events and snapshots.

type Store added in v0.0.8

type Store struct {
	// contains filtered or unexported fields
}

Store is the default implementation for the EventStore interface.

func New

func New(cfg *Config, eventCodec EventCodec, snapCodec SnapshotCodec, storage Storage) (*Store, error)

New creates new EventStore implementation.

func (*Store) Commit added in v0.0.8

func (e *Store) Commit(ctx context.Context, agg Aggregate) error

Commit commits provided aggregate events.

func (*Store) CopyWithStorage added in v0.0.9

func (e *Store) CopyWithStorage(storage Storage) *Store

CopyWithStorage creates a copy of the Store structure that has a different storage. This function could be used to create transaction implementations.

func (*Store) LoadEvents added in v0.0.9

func (e *Store) LoadEvents(ctx context.Context, agg Aggregate) error

LoadEvents gets the event stream and applies on provided aggregate.

func (*Store) LoadEventsWithSnapshot added in v0.0.9

func (e *Store) LoadEventsWithSnapshot(ctx context.Context, agg Aggregate) error

LoadEventsWithSnapshot gets the aggregate stream with the latest possible snapshot.

func (*Store) SaveSnapshot added in v0.0.8

func (e *Store) SaveSnapshot(ctx context.Context, agg Aggregate) error

SaveSnapshot stores the snapshot

func (Store) SetAggregateBase added in v0.0.8

func (a Store) SetAggregateBase(agg Aggregate, aggId, aggType string, version int64)

SetAggregateBase implements AggregateBaseSetter interface.

func (*Store) StreamAggregates added in v0.0.8

func (e *Store) StreamAggregates(ctx context.Context, aggType string, aggVersion int64, factory AggregateFactory) (<-chan Aggregate, error)

StreamAggregates opens up the aggregate streaming channel. The channel would got closed when there is no more aggregate to read or when the context is done. Closing resulting channel would result with a panic.

func (*Store) StreamEvents added in v0.0.8

func (e *Store) StreamEvents(ctx context.Context, req *StreamEventsRequest) (<-chan *Event, error)

StreamEvents opens an event stream that matches given request.

func (*Store) StreamProjections added in v0.0.8

func (e *Store) StreamProjections(ctx context.Context, aggType string, aggVersion int64, factory ProjectionFactory) (<-chan Projection, error)

StreamProjections streams the projection of given aggregate.

type StreamEventsRequest

type StreamEventsRequest struct {
	AggregateTypes    []string
	AggregateIDs      []string
	ExcludeEventTypes []string
	EventTypes        []string
	BuffSize          int
}

StreamEventsRequest is a request for the stream events query.

type UUIDGenerator

type UUIDGenerator struct{}

UUIDGenerator implements IdGenerator interface. Generates UUID V4 identifier.

func (UUIDGenerator) GenerateId

func (u UUIDGenerator) GenerateId() string

GenerateId generates identified. Implements IdGenerator interface.

Directories

Path Synopsis
Package mockes is a generated GoMock package.
Package mockes is a generated GoMock package.
sqlxes module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL