es

package
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultPageSize = 50

Functions

func WithMeta

func WithMeta(e DomainEvent, meta MetaData)

Types

type Base

type Base struct {
	EvtID         ID        `json:"id"`
	EvtOccurredOn time.Time `json:"occurred-on"`
	MetaData      MetaData  `json:"meta"`
}

func MakeBase

func MakeBase() Base

func MakeBaseWithMeta

func MakeBaseWithMeta(meta MetaData) Base

func (Base) ID

func (e Base) ID() ID

func (Base) Meta

func (e Base) Meta() *MetaData

func (Base) OccurredOn

func (e Base) OccurredOn() time.Time

type Codec

type Codec struct {
	// contains filtered or unexported fields
}

func NewCodec

func NewCodec() *Codec

func (*Codec) ContainsEventType

func (codec *Codec) ContainsEventType(v DomainEvent) bool

func (*Codec) ContainsTypeName

func (codec *Codec) ContainsTypeName(typeName string) bool

func (*Codec) Decode

func (codec *Codec) Decode(re RawEvent) (DomainEvent, error)

func (*Codec) Encode

func (codec *Codec) Encode(v DomainEvent) (RawEvent, error)

func (*Codec) EncodeEvents

func (codec *Codec) EncodeEvents(es ...DomainEvent) RawEvents

func (*Codec) Register

func (codec *Codec) Register(typeName string, prototype DomainEvent)

func (*Codec) TypeNames

func (codec *Codec) TypeNames() []string

type CodecUnion

type CodecUnion struct {
	// contains filtered or unexported fields
}

func NewCodecUnion

func NewCodecUnion(codecs ...*Codec) *CodecUnion

func (*CodecUnion) Decode

func (cu *CodecUnion) Decode(re RawEvent) (DomainEvent, error)

func (*CodecUnion) Encode

func (cu *CodecUnion) Encode(v DomainEvent) (RawEvent, error)

type DomainEvent

type DomainEvent interface {
	ID() ID
	OccurredOn() time.Time
	Meta() *MetaData
}

type DomainEventStream

type DomainEventStream chan DomainEvent

type DomainEvents

type DomainEvents []DomainEvent

type ExpectedVersionError

type ExpectedVersionError struct {
	// contains filtered or unexported fields
}

func NewExpectedVersionError

func NewExpectedVersionError(exp, curr uint64) ExpectedVersionError

func (ExpectedVersionError) Error

func (e ExpectedVersionError) Error() string

type ID

type ID string

func MakeID

func MakeID() ID

type LimitOffset

type LimitOffset struct {
	Limit  uint64
	Offset uint64
}

type MetaData

type MetaData map[string]interface{}

func UserMeta

func UserMeta(user string) MetaData

func (*MetaData) Add

func (m *MetaData) Add(typ string, value interface{})

func (MetaData) User

func (md MetaData) User() string

type QueryParams

type QueryParams struct {
	StreamID string
	ToDate   time.Time
	Type     string
	SortASC  bool
}

type RawEvent

type RawEvent struct {
	ID          ID              `json:"id,omitempty"`          // the unique id of the event
	StoreIndex  uint64          `json:"store-index"`           // the index of the event within the whole store
	StreamID    string          `json:"stream-id,omitempty"`   // the id of the current stream
	StreamIndex uint64          `json:"stream-index"`          // the index of the event within the current stream
	RecordedOn  time.Time       `json:"recorded-on,omitempty"` // the time the event was first recorded
	OccurredOn  time.Time       `json:"occurred-on,omitempty"` // the time the event occurred
	Type        string          `json:"type"`                  // the type of the domain event
	Data        json.RawMessage `json:"data,omitempty"`        // the data of the domain event
}

type RawEventStream

type RawEventStream chan RawEvent

type RawEvents

type RawEvents []RawEvent

type RawEventsStream

type RawEventsStream chan RawEvents

type SqliteXStore

type SqliteXStore struct {
	*log.Hook
	// contains filtered or unexported fields
}

func NewSqliteXStore

func NewSqliteXStore(file string) (*SqliteXStore, error)

func (*SqliteXStore) AllStreamIDs

func (s *SqliteXStore) AllStreamIDs() ([]StreamID, error)

func (*SqliteXStore) Append

func (s *SqliteXStore) Append(streamID StreamID, expectedVersion uint64, events ...RawEvent) error

func (*SqliteXStore) Close

func (s *SqliteXStore) Close()

func (*SqliteXStore) Create

func (s *SqliteXStore) Create(events ...RawEvent) error

func (*SqliteXStore) Find

func (s *SqliteXStore) Find(id ID) (RawEvent, bool)

func (*SqliteXStore) LoadLatestFrom

func (s *SqliteXStore) LoadLatestFrom(streamIDs []string) (RawEvents, error)

func (*SqliteXStore) LoadLatestFromAll

func (s *SqliteXStore) LoadLatestFromAll() (RawEvents, error)

func (*SqliteXStore) LoadSlice

func (s *SqliteXStore) LoadSlice(streamID StreamID, lo LimitOffset) (RawEvents, error)

func (*SqliteXStore) LoadSliceDescending

func (s *SqliteXStore) LoadSliceDescending(streamID StreamID, lo LimitOffset) (RawEvents, error)

func (*SqliteXStore) LoadSliceFromVersion

func (s *SqliteXStore) LoadSliceFromVersion(streamID StreamID, version uint64, lo LimitOffset) (RawEvents, error)

func (*SqliteXStore) LoadSliceUntil

func (s *SqliteXStore) LoadSliceUntil(streamID StreamID, lo LimitOffset, until time.Time) (RawEvents, error)

func (*SqliteXStore) PurgeBefore

func (s *SqliteXStore) PurgeBefore(t time.Time) (numDeleted int, err error)

func (*SqliteXStore) Query

func (s *SqliteXStore) Query(params QueryParams, lo LimitOffset) (RawEvents, error)

func (*SqliteXStore) QueryWithTypePrefix

func (s *SqliteXStore) QueryWithTypePrefix(prefix string, params QueryParams, lo LimitOffset) (RawEvents, error)

func (*SqliteXStore) StoreVersion

func (s *SqliteXStore) StoreVersion() uint64

func (*SqliteXStore) StreamVersion

func (s *SqliteXStore) StreamVersion(streamID StreamID) uint64

func (*SqliteXStore) Subscribe

func (s *SqliteXStore) Subscribe(streamID StreamID) *StreamUpdateSubscription

type Store

type Store interface {
	Close()
	Subscribe(streamID StreamID) *StreamUpdateSubscription
	StreamVersion(streamID StreamID) uint64
	StoreVersion() uint64
	Append(streamID StreamID, expectedVersion uint64, events ...RawEvent) error
	Create(events ...RawEvent) error
	LoadSlice(streamID StreamID, lo LimitOffset) (RawEvents, error)
	LoadSliceUntil(streamID StreamID, lo LimitOffset, until time.Time) (RawEvents, error)
	LoadSliceDescending(streamID StreamID, lo LimitOffset) (RawEvents, error)
	LoadSliceFromVersion(streamID StreamID, version uint64, lo LimitOffset) (RawEvents, error)
	Query(params QueryParams, lo LimitOffset) (RawEvents, error)
	QueryWithTypePrefix(prefix string, params QueryParams, lo LimitOffset) (RawEvents, error)
	Find(id ID) (RawEvent, bool)

	PurgeBefore(t time.Time) (numDeleted int, err error)
	AllStreamIDs() ([]StreamID, error)

	LoadLatestFromAll() (RawEvents, error)
	LoadLatestFrom(streamIDs []string) (RawEvents, error)
}

type StreamID

type StreamID string
const (
	StreamIDAll StreamID = "$all"
)

func (StreamID) IsAll

func (sid StreamID) IsAll() bool

type StreamUpdatePublisher

type StreamUpdatePublisher struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewStreamUpdatePublisher

func NewStreamUpdatePublisher() *StreamUpdatePublisher

func (*StreamUpdatePublisher) Close

func (p *StreamUpdatePublisher) Close()

func (*StreamUpdatePublisher) PublishStreamUpdate

func (p *StreamUpdatePublisher) PublishStreamUpdate(streamID StreamID)

func (*StreamUpdatePublisher) Subscribe

type StreamUpdateSubscription

type StreamUpdateSubscription struct {
	C chan StreamID
	// contains filtered or unexported fields
}

func (*StreamUpdateSubscription) Close

func (sub *StreamUpdateSubscription) Close()

func (*StreamUpdateSubscription) PublishStreamUpdate

func (sub *StreamUpdateSubscription) PublishStreamUpdate(streamID StreamID)

type Streamer

type Streamer struct {
	// contains filtered or unexported fields
}

func NewStreamer

func NewStreamer(store Store, streamID StreamID) *Streamer

func (*Streamer) LoadFrom

func (s *Streamer) LoadFrom(version uint64) RawEventsStream

func (*Streamer) LoadFromCtx

func (s *Streamer) LoadFromCtx(ctx context.Context, version uint64) RawEventsStream

func (*Streamer) LoadFromUntil

func (s *Streamer) LoadFromUntil(version uint64, until time.Time) RawEventsStream

func (*Streamer) LoadFromVersion

func (s *Streamer) LoadFromVersion(version uint64) RawEventsStream

func (*Streamer) StreamFromCtx

func (s *Streamer) StreamFromCtx(ctx context.Context, version uint64) RawEventsStream

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL