Documentation
¶
Index ¶
- Variables
- func WithMeta(e DomainEvent, meta MetaData)
- type Base
- type Codec
- func (codec *Codec) ContainsEventType(v DomainEvent) bool
- func (codec *Codec) ContainsTypeName(typeName string) bool
- func (codec *Codec) Decode(re RawEvent) (DomainEvent, error)
- func (codec *Codec) Encode(v DomainEvent) (RawEvent, error)
- func (codec *Codec) EncodeEvents(es ...DomainEvent) RawEvents
- func (codec *Codec) Register(typeName string, prototype DomainEvent)
- func (codec *Codec) TypeNames() []string
- type CodecUnion
- type DomainEvent
- type DomainEventStream
- type DomainEvents
- type ExpectedVersionError
- type ID
- type LimitOffset
- type MetaData
- type QueryParams
- type RawEvent
- type RawEventStream
- type RawEvents
- type RawEventsStream
- type SqliteXStore
- func (s *SqliteXStore) AllStreamIDs() ([]StreamID, error)
- func (s *SqliteXStore) Append(streamID StreamID, expectedVersion uint64, events ...RawEvent) error
- func (s *SqliteXStore) Close()
- func (s *SqliteXStore) Create(events ...RawEvent) error
- func (s *SqliteXStore) Find(id ID) (RawEvent, bool)
- func (s *SqliteXStore) LoadLatestFrom(streamIDs []string) (RawEvents, error)
- func (s *SqliteXStore) LoadLatestFromAll() (RawEvents, error)
- func (s *SqliteXStore) LoadSlice(streamID StreamID, lo LimitOffset) (RawEvents, error)
- func (s *SqliteXStore) LoadSliceDescending(streamID StreamID, lo LimitOffset) (RawEvents, error)
- func (s *SqliteXStore) LoadSliceFromVersion(streamID StreamID, version uint64, lo LimitOffset) (RawEvents, error)
- func (s *SqliteXStore) LoadSliceUntil(streamID StreamID, lo LimitOffset, until time.Time) (RawEvents, error)
- func (s *SqliteXStore) PurgeBefore(t time.Time) (numDeleted int, err error)
- func (s *SqliteXStore) Query(params QueryParams, lo LimitOffset) (RawEvents, error)
- func (s *SqliteXStore) QueryWithTypePrefix(prefix string, params QueryParams, lo LimitOffset) (RawEvents, error)
- func (s *SqliteXStore) StoreVersion() uint64
- func (s *SqliteXStore) StreamVersion(streamID StreamID) uint64
- func (s *SqliteXStore) Subscribe(streamID StreamID) *StreamUpdateSubscription
- type Store
- type StreamID
- type StreamUpdatePublisher
- type StreamUpdateSubscription
- type Streamer
- func (s *Streamer) LoadFrom(version uint64) RawEventsStream
- func (s *Streamer) LoadFromCtx(ctx context.Context, version uint64) RawEventsStream
- func (s *Streamer) LoadFromUntil(version uint64, until time.Time) RawEventsStream
- func (s *Streamer) LoadFromVersion(version uint64) RawEventsStream
- func (s *Streamer) StreamFromCtx(ctx context.Context, version uint64) RawEventsStream
Constants ¶
This section is empty.
Variables ¶
View Source
var DefaultPageSize = 50
Functions ¶
func WithMeta ¶
func WithMeta(e DomainEvent, meta MetaData)
Types ¶
type Base ¶
type Base struct {
EvtID ID `json:"id"`
EvtOccurredOn time.Time `json:"occurred-on"`
MetaData MetaData `json:"meta"`
}
func MakeBaseWithMeta ¶
func (Base) OccurredOn ¶
type Codec ¶
type Codec struct {
// contains filtered or unexported fields
}
func (*Codec) ContainsEventType ¶
func (codec *Codec) ContainsEventType(v DomainEvent) bool
func (*Codec) ContainsTypeName ¶
func (*Codec) EncodeEvents ¶
func (codec *Codec) EncodeEvents(es ...DomainEvent) RawEvents
func (*Codec) Register ¶
func (codec *Codec) Register(typeName string, prototype DomainEvent)
type CodecUnion ¶
type CodecUnion struct {
// contains filtered or unexported fields
}
func NewCodecUnion ¶
func NewCodecUnion(codecs ...*Codec) *CodecUnion
func (*CodecUnion) Decode ¶
func (cu *CodecUnion) Decode(re RawEvent) (DomainEvent, error)
func (*CodecUnion) Encode ¶
func (cu *CodecUnion) Encode(v DomainEvent) (RawEvent, error)
type DomainEventStream ¶
type DomainEventStream chan DomainEvent
type DomainEvents ¶
type DomainEvents []DomainEvent
type ExpectedVersionError ¶
type ExpectedVersionError struct {
// contains filtered or unexported fields
}
func NewExpectedVersionError ¶
func NewExpectedVersionError(exp, curr uint64) ExpectedVersionError
func (ExpectedVersionError) Error ¶
func (e ExpectedVersionError) Error() string
type LimitOffset ¶
type QueryParams ¶
type RawEvent ¶
type RawEvent struct {
ID ID `json:"id,omitempty"` // the unique id of the event
StoreIndex uint64 `json:"store-index"` // the index of the event within the whole store
StreamID string `json:"stream-id,omitempty"` // the id of the current stream
StreamIndex uint64 `json:"stream-index"` // the index of the event within the current stream
RecordedOn time.Time `json:"recorded-on,omitempty"` // the time the event was first recorded
OccurredOn time.Time `json:"occurred-on,omitempty"` // the time the event occurred
Type string `json:"type"` // the type of the domain event
Data json.RawMessage `json:"data,omitempty"` // the data of the domain event
}
type RawEventStream ¶
type RawEventStream chan RawEvent
type RawEventsStream ¶
type RawEventsStream chan RawEvents
type SqliteXStore ¶
func NewSqliteXStore ¶
func NewSqliteXStore(file string) (*SqliteXStore, error)
func (*SqliteXStore) AllStreamIDs ¶
func (s *SqliteXStore) AllStreamIDs() ([]StreamID, error)
func (*SqliteXStore) Append ¶
func (s *SqliteXStore) Append(streamID StreamID, expectedVersion uint64, events ...RawEvent) error
func (*SqliteXStore) Close ¶
func (s *SqliteXStore) Close()
func (*SqliteXStore) Create ¶
func (s *SqliteXStore) Create(events ...RawEvent) error
func (*SqliteXStore) LoadLatestFrom ¶
func (s *SqliteXStore) LoadLatestFrom(streamIDs []string) (RawEvents, error)
func (*SqliteXStore) LoadLatestFromAll ¶
func (s *SqliteXStore) LoadLatestFromAll() (RawEvents, error)
func (*SqliteXStore) LoadSlice ¶
func (s *SqliteXStore) LoadSlice(streamID StreamID, lo LimitOffset) (RawEvents, error)
func (*SqliteXStore) LoadSliceDescending ¶
func (s *SqliteXStore) LoadSliceDescending(streamID StreamID, lo LimitOffset) (RawEvents, error)
func (*SqliteXStore) LoadSliceFromVersion ¶
func (s *SqliteXStore) LoadSliceFromVersion(streamID StreamID, version uint64, lo LimitOffset) (RawEvents, error)
func (*SqliteXStore) LoadSliceUntil ¶
func (s *SqliteXStore) LoadSliceUntil(streamID StreamID, lo LimitOffset, until time.Time) (RawEvents, error)
func (*SqliteXStore) PurgeBefore ¶
func (s *SqliteXStore) PurgeBefore(t time.Time) (numDeleted int, err error)
func (*SqliteXStore) Query ¶
func (s *SqliteXStore) Query(params QueryParams, lo LimitOffset) (RawEvents, error)
func (*SqliteXStore) QueryWithTypePrefix ¶
func (s *SqliteXStore) QueryWithTypePrefix(prefix string, params QueryParams, lo LimitOffset) (RawEvents, error)
func (*SqliteXStore) StoreVersion ¶
func (s *SqliteXStore) StoreVersion() uint64
func (*SqliteXStore) StreamVersion ¶
func (s *SqliteXStore) StreamVersion(streamID StreamID) uint64
func (*SqliteXStore) Subscribe ¶
func (s *SqliteXStore) Subscribe(streamID StreamID) *StreamUpdateSubscription
type Store ¶
type Store interface {
Close()
Subscribe(streamID StreamID) *StreamUpdateSubscription
StreamVersion(streamID StreamID) uint64
StoreVersion() uint64
Append(streamID StreamID, expectedVersion uint64, events ...RawEvent) error
Create(events ...RawEvent) error
LoadSlice(streamID StreamID, lo LimitOffset) (RawEvents, error)
LoadSliceUntil(streamID StreamID, lo LimitOffset, until time.Time) (RawEvents, error)
LoadSliceDescending(streamID StreamID, lo LimitOffset) (RawEvents, error)
LoadSliceFromVersion(streamID StreamID, version uint64, lo LimitOffset) (RawEvents, error)
Query(params QueryParams, lo LimitOffset) (RawEvents, error)
QueryWithTypePrefix(prefix string, params QueryParams, lo LimitOffset) (RawEvents, error)
Find(id ID) (RawEvent, bool)
PurgeBefore(t time.Time) (numDeleted int, err error)
AllStreamIDs() ([]StreamID, error)
LoadLatestFromAll() (RawEvents, error)
LoadLatestFrom(streamIDs []string) (RawEvents, error)
}
type StreamUpdatePublisher ¶
func NewStreamUpdatePublisher ¶
func NewStreamUpdatePublisher() *StreamUpdatePublisher
func (*StreamUpdatePublisher) Close ¶
func (p *StreamUpdatePublisher) Close()
func (*StreamUpdatePublisher) PublishStreamUpdate ¶
func (p *StreamUpdatePublisher) PublishStreamUpdate(streamID StreamID)
func (*StreamUpdatePublisher) Subscribe ¶
func (p *StreamUpdatePublisher) Subscribe(streamID StreamID) *StreamUpdateSubscription
type StreamUpdateSubscription ¶
type StreamUpdateSubscription struct {
C chan StreamID
// contains filtered or unexported fields
}
func (*StreamUpdateSubscription) Close ¶
func (sub *StreamUpdateSubscription) Close()
func (*StreamUpdateSubscription) PublishStreamUpdate ¶
func (sub *StreamUpdateSubscription) PublishStreamUpdate(streamID StreamID)
type Streamer ¶
type Streamer struct {
// contains filtered or unexported fields
}
func NewStreamer ¶
func (*Streamer) LoadFrom ¶
func (s *Streamer) LoadFrom(version uint64) RawEventsStream
func (*Streamer) LoadFromCtx ¶
func (s *Streamer) LoadFromCtx(ctx context.Context, version uint64) RawEventsStream
func (*Streamer) LoadFromUntil ¶
func (s *Streamer) LoadFromUntil(version uint64, until time.Time) RawEventsStream
func (*Streamer) LoadFromVersion ¶
func (s *Streamer) LoadFromVersion(version uint64) RawEventsStream
func (*Streamer) StreamFromCtx ¶
func (s *Streamer) StreamFromCtx(ctx context.Context, version uint64) RawEventsStream
Source Files
¶
Click to show internal directories.
Click to hide internal directories.