stream

package module
v0.0.0-...-4d4b126 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2023 License: GPL-3.0 Imports: 14 Imported by: 0

README

stream

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrEndOfStream is the error returned by Reader when no more input is available.
	// Reading functions should return ErrEndOfStream only to signal a graceful end of input.
	// If the ErrEndOfStream occurs unexpectedly in a structured data stream,
	// the appropriate error is either ErrWrongPosition or some other error related
	// to underlying Reader
	ErrEndOfStream = Err("stream: end of stream")

	ErrDocumentNotFound = Err("stream:document not found")

	// ErrConcurrentWrite when Appender or Writer is running.
	ErrConcurrentWrite = Err("stream: concurrent write")

	// ErrShortWrite means that a write accepted fewer Message number than requested
	// but failed to return an explicit error.
	ErrShortWrite = Err("stream: short write")
)
View Source
var MemoryEventStore = NewMemoryEventStore()

Functions

func NewMemoryEventStore

func NewMemoryEventStore() *store

Types

type Aggregate

type Aggregate[R Root] struct {
	// contains filtered or unexported fields
}

Aggregate tracks Root state, by keeping Sequence of changes while read and write of uncommitted events. Events are generated by Root and passed to EventStore, on the other hand state of Root rebuild from EventStore to Root To change state of Root

func MustAggregate

func MustAggregate[R Root](id ID, nr NewRoot[R], definitions []event) *Aggregate[R]

func NewAggregate

func NewAggregate[R Root](id ID, nr NewRoot[R], definitions []event) (*Aggregate[R], error)

func (*Aggregate[R]) Events

func (a *Aggregate[R]) Events() Events

func (*Aggregate[R]) GoString

func (a *Aggregate[R]) GoString() string

func (*Aggregate[R]) ID

func (a *Aggregate[R]) ID() string

func (*Aggregate[R]) ReadFrom

func (a *Aggregate[R]) ReadFrom(es EventStore) error

func (*Aggregate[R]) Resource

func (a *Aggregate[R]) Resource() Resource

func (*Aggregate[R]) Run

func (a *Aggregate[R]) Run(c Command[R]) error

Run todo recover panic from Command

Example
package main

import (
	"context"
	"fmt"

	"github.com/sokool/stream"
	"github.com/sokool/stream/example/chat/threads"
)

func main() {
	id, _ := stream.NewID[threads.Thread]("k8Duq81o")
	chat, err := stream.NewAggregate(id, threads.New, threads.Events)
	if err != nil {
		return
	}
	fmt.Println(chat)

	if err = chat.Run(func(t *threads.Thread) error { return t.Start("elo", "dood") }); err != nil {
		return
	}
	fmt.Println(chat)

	if _, err = chat.WriteTo(stream.MemoryEventStore); err != nil {
		return
	}
	fmt.Println(chat)

}

type Person struct {
}

func (p *Person) ID() string {
	//TODO implement me
	return ""
}

func (p *Person) Grant(r ...stream.Role) error {
	for i := range r {
		fmt.Println(r[i])
	}
	return nil
}

func (p *Person) IsGranted(resource ...stream.Resource) error {
	for i := range resource {
		fmt.Println(resource[i])
	}
	return nil
}

func (p *Person) Context() stream.Context {
	return context.TODO()
}
Output:

049fab7a.Thread
049fab7a.Thread->2
049fab7a.Thread#2

func (*Aggregate[R]) String

func (a *Aggregate[R]) String() string

func (*Aggregate[R]) Version

func (a *Aggregate[R]) Version() int64

func (*Aggregate[R]) WriteTo

func (a *Aggregate[R]) WriteTo(es EventStore) (Events, error)

type Aggregates

type Aggregates[R Root] struct {
	// contains filtered or unexported fields
}

Aggregates is todo :)

func NewAggregates

func NewAggregates[R Root](rf NewRoot[R], definitions []event) *Aggregates[R]

func (*Aggregates[R]) CacheInterval

func (a *Aggregates[R]) CacheInterval(d time.Duration) *Aggregates[R]

func (*Aggregates[R]) Compose

func (a *Aggregates[R]) Compose(e *Engine) error

func (*Aggregates[R]) Execute

func (a *Aggregates[R]) Execute(s Session, id string, c Command[R]) error

func (*Aggregates[R]) Get

func (a *Aggregates[R]) Get(s Session, id string) (*Aggregate[R], error)

func (*Aggregates[R]) Grant

func (a *Aggregates[R]) Grant()

func (*Aggregates[R]) Logger

func (a *Aggregates[R]) Logger(l NewLogger) *Aggregates[R]

func (*Aggregates[R]) OnWrite

func (a *Aggregates[R]) OnWrite(fn OnEvent) *Aggregates[R]

func (*Aggregates[R]) Rename

func (a *Aggregates[R]) Rename(s string) *Aggregates[R]

func (*Aggregates[R]) Set

func (a *Aggregates[R]) Set(s Session, r *Aggregate[R]) error

func (*Aggregates[R]) Storage

func (a *Aggregates[R]) Storage(es EventStore) *Aggregates[R]

func (*Aggregates[R]) String

func (a *Aggregates[R]) String() string

func (*Aggregates[R]) Writer

func (a *Aggregates[R]) Writer(w Writer) *Aggregates[R]

type Cache

type Cache[K comparable, V any] struct {
	// contains filtered or unexported fields
}

func NewCache

func NewCache[K comparable, V any](d ...time.Duration) *Cache[K, V]

func (*Cache[K, V]) Get

func (c *Cache[K, V]) Get(key K) (V, bool)

func (*Cache[K, V]) Keys

func (c *Cache[K, V]) Keys() []K

func (*Cache[K, V]) Set

func (c *Cache[K, V]) Set(key K, value V, timeout ...time.Duration) error

type Closer

type Closer interface {
	Close(error) error
}

type Command

type Command[R Root] func(R) error

type Committer

type Committer interface {
	Commit(e event, createdAt time.Time) error //  todo not able to deny it (remove error)
}

type Component

type Component interface {
	Compose(*Engine) error
}

type Configuration

type Configuration struct {
	Name Type
	// Logger
	Logger NewLogger

	// EventStore factory
	EventStore func(Logger) EventStore // todo func not needed
}

type Context

type Context = context.Context

type Date

type Date = time.Time

type Document

type Document interface {
	Entity
	Committer
}

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

func New

func New(c *Configuration) *Engine

func (*Engine) Compose

func (s *Engine) Compose(c ...Component) error

func (*Engine) Run

func (s *Engine) Run()

func (*Engine) Write

func (s *Engine) Write(e Events) (n int, err error)

type Entities

type Entities[E Entity] interface {
	One(E) error // todo One(Events) (E, error)
	Read([]E, []byte) error
	Update(...E) error
	Delete(...E) error
}

func NewMemoryEntities

func NewMemoryEntities[E Entity]() Entities[E]

type Entity

type Entity interface {
	ID() string
	Version() int64
}

type Event

type Event struct {
	// contains filtered or unexported fields
}

func NewEvent

func NewEvent(s Sequence, e event) (Event, error)

func (*Event) Aggregate

func (e *Event) Aggregate() Sequence

func (*Event) Belongs

func (e *Event) Belongs(to ID) bool

func (*Event) Body

func (e *Event) Body() any

func (*Event) Correlate

func (e *Event) Correlate(with Event) *Event

func (*Event) CreatedAt

func (e *Event) CreatedAt() time.Time

func (*Event) Decode

func (e *Event) Decode(b []byte) error

func (*Event) Encode

func (e *Event) Encode() ([]byte, error)

func (*Event) GoString

func (e *Event) GoString() string

func (*Event) ID

func (e *Event) ID() UUID

func (*Event) IsEmpty

func (e *Event) IsEmpty() bool

func (*Event) Name

func (e *Event) Name() string

func (*Event) Resource

func (e *Event) Resource() Resource

func (*Event) Respond

func (e *Event) Respond(to Event) *Event

func (*Event) Role

func (e *Event) Role(id, name string) Role

func (*Event) Sequence

func (e *Event) Sequence() int64

func (*Event) Stream

func (e *Event) Stream() ID

func (*Event) String

func (e *Event) String() string

func (*Event) Type

func (e *Event) Type() Type

type EventStore

type EventStore interface {
	ReadWriter(Sequence) ReadWriterAt
	Reader(Query) Reader
}

type EventStoreFunc

type EventStoreFunc func(*schemas, Logger) EventStore

type Events

type Events []Event

func NewEvents

func NewEvents(s Sequence, events ...event) (ee Events, err error)

func (Events) IsEmpty

func (r Events) IsEmpty() bool

func (Events) IsUnique

func (r Events) IsUnique(id ID) bool

func (Events) Last

func (r Events) Last() Event

func (Events) Resources

func (r Events) Resources() []Resource

func (Events) Size

func (r Events) Size() int

func (Events) String

func (r Events) String() string

func (Events) UUID

func (r Events) UUID() UUID

func (Events) Unique

func (r Events) Unique() ID

Unique gives Sequence when all events has same Sequence

type Filter

type Filter interface {
	// Filtrate todo -> Filtrate([]Message) ([]Message, error)
	Filtrate(Events) (bool, error)
}

type FilterFunc

type FilterFunc func(Events) (bool, error)

func (FilterFunc) Filtrate

func (fn FilterFunc) Filtrate(m Events) (bool, error)

type Filters

type Filters []Filter

func (Filters) Filtrate

func (s Filters) Filtrate(m Events) (bool, error)

type ID

type ID struct {
	// contains filtered or unexported fields
}

func NewID

func NewID[T any](s string) (d ID, err error)

func ParseID

func ParseID(s string) (ID, error)

func (ID) Hash

func (id ID) Hash() UUID

func (ID) IsEmpty

func (id ID) IsEmpty() bool

func (ID) Resource

func (id ID) Resource() Resource

func (ID) Sequence

func (id ID) Sequence() Sequence

func (ID) String

func (id ID) String() string

func (ID) Type

func (id ID) Type() Type

func (ID) UUID

func (id ID) UUID() UUID

type Logger

type Logger = func(string, ...any)

type Meta

type Meta struct {

	// Every Message has 3 UUID's [ID, CorrelationID, CausationID]. When you are
	// responding to a Message (either a Thread or and Event) you copy the
	// CorrelationID of the Message you are responding to, to your new
	// CorrelationID. The CausationID of your Message is the UUID of the
	// Message you are responding to.
	//
	// Greg Young
	// --> https://groups.google.com/d/msg/dddcqrs/qGYC6qZEqOI/LhQup9v7EwAJ
	Correlation, Causation UUID

	// Author helps to check what person/device generate this Message.
	Author string
}

func NewMeta

func NewMeta(c Context) (m Meta, err error)

type NewDocument

type NewDocument[D Document] func(Events) (D, error)

type NewEntities

type NewEntities[E Entity] func() Entities[E]

type NewLogger

type NewLogger func(...string) Logger

type NewRoot

type NewRoot[R Root] func(string) (R, error)

type OnEvent

type OnEvent func(s Session, e Event) error

type Projections

type Projections[D Document] struct {
	Store Entities[D]
	// contains filtered or unexported fields
}

func NewProjections

func NewProjections[D Document](nd NewDocument[D], e ...Entities[D]) *Projections[D]

func (*Projections[D]) Compose

func (p *Projections[D]) Compose(in *Engine) error

func (*Projections[D]) Logger

func (p *Projections[D]) Logger(l NewLogger) *Projections[D]

func (*Projections[D]) Storage

func (p *Projections[D]) Storage(n Entities[D]) *Projections[D]

func (*Projections[D]) Write

func (p *Projections[D]) Write(e Events) (n int, err error)

type Query

type Query struct {
	Stream       UUID
	Root         Type
	Events       []Type
	FromSequence int64
	From, To     time.Time
	Text         string
	NewestFirst  bool
	Shutdown     context.Context
}

Query read stream events

type ReadWriterAt

type ReadWriterAt interface {
	ReaderAt
	WriterAt
}

type ReadWriterCloser

type ReadWriterCloser interface {
	Reader
	Writer
	Closer
}

type Reader

type Reader interface {
	Read(Events) (n int, err error)
}

Reader

type ReaderAt

type ReaderAt interface {
	ReadAt(e Events, pos int64) (n int, err error)
}

ReaderAt todo replace pos with Sequence

type ReaderFrom

type ReaderFrom interface {
	ReadFrom(r Reader) (n int64, err error)
}

type Repository

type Repository interface {
	EventStore(schemas) (EventStore, error)
}

type Resource

type Resource struct {
	ID     string
	Name   string
	Action string
}

func (Resource) Role

func (r Resource) Role(id, name string) Role

func (Resource) String

func (r Resource) String() string

type Role

type Role struct {
	ID        string
	Name      string
	Resources []Resource
}

func (Role) Resource

func (r Role) Resource(id, name, action string) Role

func (Role) String

func (r Role) String() (s string)

type Root

type Root interface {
	//Entity
	ID() string
	Committer
	Uncommitted(clear bool) []event
}

type Schemas

type Schemas map[event]Scheme

type Scheme

type Scheme struct {
	Name        string
	Description string
	Transaction Type
	OnMigrate   func(version int, payload []byte)
}

type Sequence

type Sequence struct {
	// contains filtered or unexported fields
}

Sequence todo

func MustSequence

func MustSequence[T any](id string) Sequence

func NewSequence

func NewSequence[T any](id string, n ...int64) (s Sequence, err error)

func ParseSequence

func ParseSequence(s string) (n Sequence, err error)

func (Sequence) Hash

func (s Sequence) Hash() UUID

func (Sequence) ID

func (s Sequence) ID() ID

func (Sequence) IsEmpty

func (s Sequence) IsEmpty() bool

func (Sequence) MarshalJSON

func (s Sequence) MarshalJSON() ([]byte, error)

func (Sequence) Next

func (s Sequence) Next() Sequence

func (Sequence) Number

func (s Sequence) Number() int64

func (Sequence) Reset

func (s Sequence) Reset()

func (Sequence) Resource

func (s Sequence) Resource() Resource

func (Sequence) String

func (s Sequence) String() string

func (*Sequence) UnmarshalJSON

func (s *Sequence) UnmarshalJSON(b []byte) error

type Serializer

type Serializer interface {
	Encode(e Event) ([]byte, error)
	Decode(e *Event, b []byte) error
}

type Session

type Session interface {
	Grant(...Role) error
	IsGranted(...Resource) error
}

type Type

type Type string

func MustType

func MustType[T any](v ...T) Type

func NewType

func NewType[T any](v ...T) (Type, error)

func (Type) CutPrefix

func (t Type) CutPrefix(of Type) Type

func (Type) Hash

func (t Type) Hash() UUID

func (Type) ID

func (t Type) ID(s string) ID

func (Type) IsEmpty

func (t Type) IsEmpty() bool

func (Type) Rename

func (t Type) Rename(s string) Type

func (Type) String

func (t Type) String() string

func (Type) ToLower

func (t Type) ToLower() string

type Types

type Types map[Type]bool

todo map[ProjectionType][]EventType

func NewTypes

func NewTypes(s ...string) (Types, error)

type UUID

type UUID struct {
	// contains filtered or unexported fields
}

func NewUUID

func NewUUID(s string) UUID

func (UUID) Is

func (u UUID) Is(n string) bool

func (UUID) IsEmpty

func (u UUID) IsEmpty() bool

func (UUID) Short

func (u UUID) Short() string

func (UUID) String

func (u UUID) String() string

type WriteCloser

type WriteCloser interface {
	Writer
	Closer
}

type Writer

type Writer interface {
	Write(Events) (n int, err error)
}

type WriterAt

type WriterAt interface {
	WriteAt(m Events, pos int64) (n int, err error)
}

WriterAt store event in Document starting from pos... todo todo replace pos with Sequence

type WriterTo

type WriterTo interface {
	WriteTo(Writer) (n int64, err error)
}

Directories

Path Synopsis
example
sql

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL