rangedb

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2020 License: BSD-3-Clause Imports: 4 Imported by: 4

README

RangeDB

Build Status Docker Build Status Go Report Card Test Coverage Maintainability GoDoc Go Version Release Sourcegraph License

An event store database in Go. This package includes a stand-alone database and web server along with a library for embedding event sourced applications.

Examples are provided here.

Docker

docker run -p 8080:8080 inklabs/rangedb

Community

DDD-CQRS-ES slack group:

Documentation

Index

Constants

View Source
const Version = "0.3.0"

Variables

This section is empty.

Functions

func GetEventStream

func GetEventStream(message AggregateMessage) string

GetEventStream returns the stream name for an event.

func GetStream

func GetStream(aggregateType, aggregateID string) string

GetStream returns the stream name for an aggregateType and aggregateID.

func MergeRecordChannelsInOrder

func MergeRecordChannelsInOrder(channels []<-chan *Record, eventNumber uint64) <-chan *Record

MergeRecordChannelsInOrder combines record channels ordered by record.GlobalSequenceNumber.

func ReplayEvents

func ReplayEvents(store Store, eventNumber uint64, subscribers ...RecordSubscriber)

ReplayEvents applies all events to each subscriber.

Types

type AggregateMessage

type AggregateMessage interface {
	AggregateID() string
	AggregateType() string
}

AggregateMessage is the interface that supports building an event stream name.

type Event

type Event interface {
	AggregateMessage
	EventType() string
}

Event is the interface that defines the required event methods.

type EventBinder added in v0.2.4

type EventBinder interface {
	Bind(events ...Event)
}

type EventTypeIdentifier added in v0.3.0

type EventTypeIdentifier interface {
	EventTypeLookup(eventTypeName string) (reflect.Type, bool)
}

type Record

type Record struct {
	AggregateType        string      `msgpack:"a" json:"aggregateType"`
	AggregateID          string      `msgpack:"i" json:"aggregateID"`
	GlobalSequenceNumber uint64      `msgpack:"g" json:"globalSequenceNumber"`
	StreamSequenceNumber uint64      `msgpack:"s" json:"sequenceNumber"`
	InsertTimestamp      uint64      `msgpack:"u" json:"insertTimestamp"`
	EventID              string      `msgpack:"e" json:"eventID"`
	EventType            string      `msgpack:"t" json:"eventType"`
	Data                 interface{} `msgpack:"d" json:"data"`
	Metadata             interface{} `msgpack:"m" json:"metadata"`
}

Record contains event data and metadata.

func ReadNRecords added in v0.3.0

func ReadNRecords(totalEvents uint64, f func(context.Context) <-chan *Record) []*Record

ReadNRecords reads up to N records from the channel returned by f into a slice

type RecordIoStream

type RecordIoStream interface {
	Read(io.Reader) (<-chan *Record, <-chan error)
	Write(io.Writer, <-chan *Record) <-chan error
	Bind(events ...Event)
}

RecordIoStream is the interface that (de)serializes a stream of Records.

type RecordSerializer

type RecordSerializer interface {
	Serialize(record *Record) ([]byte, error)
	Deserialize(data []byte) (*Record, error)
	Bind(events ...Event)
}

RecordSerializer is the interface that (de)serializes Records.

type RecordSubscriber

type RecordSubscriber interface {
	Accept(record *Record)
}

RecordSubscriber is the interface that defines how a projection receives Records.

type RecordSubscriberFunc added in v0.3.0

type RecordSubscriberFunc func(*Record)

The RecordSubscriberFunc type is an adapter to allow the use of ordinary functions as record subscribers. If f is a function with the appropriate signature, RecordSubscriberFunc(f) is a Handler that calls f.

func (RecordSubscriberFunc) Accept added in v0.3.0

func (f RecordSubscriberFunc) Accept(record *Record)

type Store

type Store interface {
	EventBinder
	EventsStartingWith(ctx context.Context, eventNumber uint64) <-chan *Record
	EventsByAggregateTypesStartingWith(ctx context.Context, eventNumber uint64, aggregateTypes ...string) <-chan *Record
	EventsByStreamStartingWith(ctx context.Context, eventNumber uint64, streamName string) <-chan *Record
	Save(event Event, metadata interface{}) error
	SaveEvent(aggregateType, aggregateID, eventType, eventID string, event, metadata interface{}) error
	SubscribeStartingWith(ctx context.Context, eventNumber uint64, subscribers ...RecordSubscriber)
	TotalEventsInStream(streamName string) uint64
}

Store is the interface that stores and retrieves event records.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL