Documentation
¶
Index ¶
- Variables
- func NewMemoryEventStore() *store
- type Aggregate
- func (a *Aggregate[R]) Events() Events
- func (a *Aggregate[R]) GoString() string
- func (a *Aggregate[R]) ID() string
- func (a *Aggregate[R]) ReadFrom(es EventStore) error
- func (a *Aggregate[R]) Resource() Resource
- func (a *Aggregate[R]) Run(c Command[R]) error
- func (a *Aggregate[R]) String() string
- func (a *Aggregate[R]) Version() int64
- func (a *Aggregate[R]) WriteTo(es EventStore) (Events, error)
- type Aggregates
- func (a *Aggregates[R]) CacheInterval(d time.Duration) *Aggregates[R]
- func (a *Aggregates[R]) Compose(e *Engine) error
- func (a *Aggregates[R]) Execute(s Session, id string, c Command[R]) error
- func (a *Aggregates[R]) Get(s Session, id string) (*Aggregate[R], error)
- func (a *Aggregates[R]) Grant()
- func (a *Aggregates[R]) Logger(l NewLogger) *Aggregates[R]
- func (a *Aggregates[R]) OnWrite(fn OnEvent) *Aggregates[R]
- func (a *Aggregates[R]) Rename(s string) *Aggregates[R]
- func (a *Aggregates[R]) Set(s Session, r *Aggregate[R]) error
- func (a *Aggregates[R]) Storage(es EventStore) *Aggregates[R]
- func (a *Aggregates[R]) String() string
- func (a *Aggregates[R]) Writer(w Writer) *Aggregates[R]
- type Cache
- type Closer
- type Command
- type Committer
- type Component
- type Configuration
- type Context
- type Date
- type Document
- type Engine
- type Entities
- type Entity
- type Event
- func (e *Event) Aggregate() Sequence
- func (e *Event) Belongs(to ID) bool
- func (e *Event) Body() any
- func (e *Event) Correlate(with Event) *Event
- func (e *Event) CreatedAt() time.Time
- func (e *Event) Decode(b []byte) error
- func (e *Event) Encode() ([]byte, error)
- func (e *Event) GoString() string
- func (e *Event) ID() UUID
- func (e *Event) IsEmpty() bool
- func (e *Event) Name() string
- func (e *Event) Resource() Resource
- func (e *Event) Respond(to Event) *Event
- func (e *Event) Role(id, name string) Role
- func (e *Event) Sequence() int64
- func (e *Event) Stream() ID
- func (e *Event) String() string
- func (e *Event) Type() Type
- type EventStore
- type EventStoreFunc
- type Events
- type Filter
- type FilterFunc
- type Filters
- type ID
- type Logger
- type Meta
- type NewDocument
- type NewEntities
- type NewLogger
- type NewRoot
- type OnEvent
- type Projections
- type Query
- type ReadWriterAt
- type ReadWriterCloser
- type Reader
- type ReaderAt
- type ReaderFrom
- type Repository
- type Resource
- type Role
- type Root
- type Schemas
- type Scheme
- type Sequence
- func (s Sequence) Hash() UUID
- func (s Sequence) ID() ID
- func (s Sequence) IsEmpty() bool
- func (s Sequence) MarshalJSON() ([]byte, error)
- func (s Sequence) Next() Sequence
- func (s Sequence) Number() int64
- func (s Sequence) Reset()
- func (s Sequence) Resource() Resource
- func (s Sequence) String() string
- func (s *Sequence) UnmarshalJSON(b []byte) error
- type Serializer
- type Session
- type Type
- type Types
- type UUID
- type WriteCloser
- type Writer
- type WriterAt
- type WriterTo
Examples ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ( // ErrEndOfStream is the error returned by Reader when no more input is available. // Reading functions should return ErrEndOfStream only to signal a graceful end of input. // If the ErrEndOfStream occurs unexpectedly in a structured data stream, // the appropriate error is either ErrWrongPosition or some other error related // to underlying Reader ErrEndOfStream = Err("stream: end of stream") ErrDocumentNotFound = Err("stream:document not found") // ErrConcurrentWrite when Appender or Writer is running. ErrConcurrentWrite = Err("stream: concurrent write") // ErrShortWrite means that a write accepted fewer Message number than requested // but failed to return an explicit error. ErrShortWrite = Err("stream: short write") )
View Source
var Err = errors.Errorf
View Source
var MemoryEventStore = NewMemoryEventStore()
Functions ¶
func NewMemoryEventStore ¶
func NewMemoryEventStore() *store
Types ¶
type Aggregate ¶
type Aggregate[R Root] struct { // contains filtered or unexported fields }
Aggregate tracks Root state, by keeping Sequence of changes while read and write of uncommitted events. Events are generated by Root and passed to EventStore, on the other hand state of Root rebuild from EventStore to Root To change state of Root
func MustAggregate ¶
func NewAggregate ¶
func (*Aggregate[R]) ReadFrom ¶
func (a *Aggregate[R]) ReadFrom(es EventStore) error
func (*Aggregate[R]) Run ¶
Run todo recover panic from Command
Example ¶
package main
import (
"context"
"fmt"
"github.com/sokool/stream"
"github.com/sokool/stream/example/chat/threads"
)
func main() {
id, _ := stream.NewID[threads.Thread]("k8Duq81o")
chat, err := stream.NewAggregate(id, threads.New, threads.Events)
if err != nil {
return
}
fmt.Println(chat)
if err = chat.Run(func(t *threads.Thread) error { return t.Start("elo", "dood") }); err != nil {
return
}
fmt.Println(chat)
if _, err = chat.WriteTo(stream.MemoryEventStore); err != nil {
return
}
fmt.Println(chat)
}
type Person struct {
}
func (p *Person) ID() string {
//TODO implement me
return ""
}
func (p *Person) Grant(r ...stream.Role) error {
for i := range r {
fmt.Println(r[i])
}
return nil
}
func (p *Person) IsGranted(resource ...stream.Resource) error {
for i := range resource {
fmt.Println(resource[i])
}
return nil
}
func (p *Person) Context() stream.Context {
return context.TODO()
}
Output: 049fab7a.Thread 049fab7a.Thread->2 049fab7a.Thread#2
type Aggregates ¶
type Aggregates[R Root] struct { // contains filtered or unexported fields }
Aggregates is todo :)
func NewAggregates ¶
func NewAggregates[R Root](rf NewRoot[R], definitions []event) *Aggregates[R]
func (*Aggregates[R]) CacheInterval ¶
func (a *Aggregates[R]) CacheInterval(d time.Duration) *Aggregates[R]
func (*Aggregates[R]) Compose ¶
func (a *Aggregates[R]) Compose(e *Engine) error
func (*Aggregates[R]) Execute ¶
func (a *Aggregates[R]) Execute(s Session, id string, c Command[R]) error
func (*Aggregates[R]) Get ¶
func (a *Aggregates[R]) Get(s Session, id string) (*Aggregate[R], error)
func (*Aggregates[R]) Grant ¶
func (a *Aggregates[R]) Grant()
func (*Aggregates[R]) Logger ¶
func (a *Aggregates[R]) Logger(l NewLogger) *Aggregates[R]
func (*Aggregates[R]) OnWrite ¶
func (a *Aggregates[R]) OnWrite(fn OnEvent) *Aggregates[R]
func (*Aggregates[R]) Rename ¶
func (a *Aggregates[R]) Rename(s string) *Aggregates[R]
func (*Aggregates[R]) Storage ¶
func (a *Aggregates[R]) Storage(es EventStore) *Aggregates[R]
func (*Aggregates[R]) String ¶
func (a *Aggregates[R]) String() string
func (*Aggregates[R]) Writer ¶
func (a *Aggregates[R]) Writer(w Writer) *Aggregates[R]
type Cache ¶
type Cache[K comparable, V any] struct { // contains filtered or unexported fields }
type Configuration ¶
type Configuration struct {
Name Type
// Logger
Logger NewLogger
// EventStore factory
EventStore func(Logger) EventStore // todo func not needed
}
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
func New ¶
func New(c *Configuration) *Engine
type Entities ¶
type Entities[E Entity] interface { One(E) error // todo One(Events) (E, error) Read([]E, []byte) error Update(...E) error Delete(...E) error }
func NewMemoryEntities ¶
type EventStore ¶
type EventStore interface {
ReadWriter(Sequence) ReadWriterAt
Reader(Query) Reader
}
type EventStoreFunc ¶
type EventStoreFunc func(*schemas, Logger) EventStore
type FilterFunc ¶
type Meta ¶
type Meta struct {
// Every Message has 3 UUID's [ID, CorrelationID, CausationID]. When you are
// responding to a Message (either a Thread or and Event) you copy the
// CorrelationID of the Message you are responding to, to your new
// CorrelationID. The CausationID of your Message is the UUID of the
// Message you are responding to.
//
// Greg Young
// --> https://groups.google.com/d/msg/dddcqrs/qGYC6qZEqOI/LhQup9v7EwAJ
Correlation, Causation UUID
// Author helps to check what person/device generate this Message.
Author string
}
type NewDocument ¶
type NewEntities ¶
type Projections ¶
func NewProjections ¶
func NewProjections[D Document](nd NewDocument[D], e ...Entities[D]) *Projections[D]
func (*Projections[D]) Compose ¶
func (p *Projections[D]) Compose(in *Engine) error
func (*Projections[D]) Logger ¶
func (p *Projections[D]) Logger(l NewLogger) *Projections[D]
func (*Projections[D]) Storage ¶
func (p *Projections[D]) Storage(n Entities[D]) *Projections[D]
type Query ¶
type Query struct {
Stream UUID
Root Type
Events []Type
FromSequence int64
From, To time.Time
Text string
NewestFirst bool
Shutdown context.Context
}
Query read stream events
type ReadWriterAt ¶
type ReadWriterCloser ¶
type ReaderFrom ¶
type Repository ¶
type Repository interface {
EventStore(schemas) (EventStore, error)
}
type Sequence ¶
type Sequence struct {
// contains filtered or unexported fields
}
Sequence todo
func MustSequence ¶
func ParseSequence ¶
func (Sequence) MarshalJSON ¶
func (*Sequence) UnmarshalJSON ¶
type Serializer ¶
type WriteCloser ¶
Source Files
¶
Click to show internal directories.
Click to hide internal directories.