Documentation
¶
Index ¶
- func Migrate(config *Config, conn *sqlx.DB) error
- type Config
- type Storage
- func (s *Storage) Err(err error) error
- func (s *Storage) GetEventStream(ctx context.Context, aggId, aggType string) ([]*eventsource.Event, error)
- func (s *Storage) GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*eventsource.Snapshot, error)
- func (s *Storage) GetStreamFromRevision(ctx context.Context, aggId string, aggType string, from int64) ([]*eventsource.Event, error)
- func (s *Storage) NewCursor(ctx context.Context, aggType string, aggVersion int64) (eventsource.Cursor, error)
- func (s *Storage) SaveEvents(ctx context.Context, es []*eventsource.Event) error
- func (s *Storage) SaveSnapshot(ctx context.Context, snap *eventsource.Snapshot) error
- func (s *Storage) StreamEvents(ctx context.Context, req *eventsource.StreamEventsRequest) (<-chan *eventsource.Event, error)
- type Transaction
- func (t *Transaction) Commit() error
- func (t *Transaction) Conn() (*sqlx.Tx, error)
- func (s *Transaction) Err(err error) error
- func (s *Transaction) GetEventStream(ctx context.Context, aggId, aggType string) ([]*eventsource.Event, error)
- func (s *Transaction) GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*eventsource.Snapshot, error)
- func (s *Transaction) GetStreamFromRevision(ctx context.Context, aggId string, aggType string, from int64) ([]*eventsource.Event, error)
- func (s *Transaction) NewCursor(ctx context.Context, aggType string, aggVersion int64) (eventsource.Cursor, error)
- func (s *Transaction) SaveEvents(ctx context.Context, es []*eventsource.Event) error
- func (s *Transaction) SaveSnapshot(ctx context.Context, snap *eventsource.Snapshot) error
- func (s *Transaction) StreamEvents(ctx context.Context, req *eventsource.StreamEventsRequest) (<-chan *eventsource.Event, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Config ¶
type Config struct {
EventTable string
SnapshotTable string
SchemaName string // Optional
AggregateTable string
WorkersCount int
}
Config is the configuration for the event storage.
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
func (*Storage) GetEventStream ¶
func (s *Storage) GetEventStream(ctx context.Context, aggId, aggType string) ([]*eventsource.Event, error)
GetEventStream gets the event stream for provided aggregate. Implements eventsource.Storage interface.
func (*Storage) GetSnapshot ¶
func (s *Storage) GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*eventsource.Snapshot, error)
GetSnapshot gets the latest snapshot for given aggregate. Implements eventsource.Storage interface.
func (*Storage) GetStreamFromRevision ¶
func (s *Storage) GetStreamFromRevision(ctx context.Context, aggId string, aggType string, from int64) ([]*eventsource.Event, error)
GetStreamFromRevision gets the event stream for given aggregate where the revision is subsequent from provided.
func (*Storage) NewCursor ¶
func (s *Storage) NewCursor(ctx context.Context, aggType string, aggVersion int64) (eventsource.Cursor, error)
NewCursor creates a new cursor.
func (*Storage) SaveEvents ¶
func (s *Storage) SaveEvents(ctx context.Context, es []*eventsource.Event) error
SaveEvents stores provided events in the database. Implements eventsource.Storage interface.
func (*Storage) SaveSnapshot ¶
func (s *Storage) SaveSnapshot(ctx context.Context, snap *eventsource.Snapshot) error
SaveSnapshot stores the snapshot in the database. Implements eventsource.Storage interface.
func (*Storage) StreamEvents ¶
func (s *Storage) StreamEvents(ctx context.Context, req *eventsource.StreamEventsRequest) (<-chan *eventsource.Event, error)
type Transaction ¶
type Transaction struct {
// contains filtered or unexported fields
}
Transaction is the
func (*Transaction) GetEventStream ¶
func (s *Transaction) GetEventStream(ctx context.Context, aggId, aggType string) ([]*eventsource.Event, error)
GetEventStream gets the event stream for provided aggregate. Implements eventsource.Storage interface.
func (*Transaction) GetSnapshot ¶
func (s *Transaction) GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*eventsource.Snapshot, error)
GetSnapshot gets the latest snapshot for given aggregate. Implements eventsource.Storage interface.
func (*Transaction) GetStreamFromRevision ¶
func (s *Transaction) GetStreamFromRevision(ctx context.Context, aggId string, aggType string, from int64) ([]*eventsource.Event, error)
GetStreamFromRevision gets the event stream for given aggregate where the revision is subsequent from provided.
func (*Transaction) NewCursor ¶
func (s *Transaction) NewCursor(ctx context.Context, aggType string, aggVersion int64) (eventsource.Cursor, error)
NewCursor creates a new cursor.
func (*Transaction) SaveEvents ¶
func (s *Transaction) SaveEvents(ctx context.Context, es []*eventsource.Event) error
SaveEvents stores provided events in the database. Implements eventsource.Storage interface.
func (*Transaction) SaveSnapshot ¶
func (s *Transaction) SaveSnapshot(ctx context.Context, snap *eventsource.Snapshot) error
SaveSnapshot stores the snapshot in the database. Implements eventsource.Storage interface.
func (*Transaction) StreamEvents ¶
func (s *Transaction) StreamEvents(ctx context.Context, req *eventsource.StreamEventsRequest) (<-chan *eventsource.Event, error)