sqlxes

package module
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2021 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Migrate

func Migrate(config *Config, conn *sqlx.DB) error

Migrate executes table and types migration for the event store and snapshot. The table names are taken from the config.

Types

type Config

type Config struct {
	EventTable     string
	SnapshotTable  string
	SchemaName     string // Optional
	AggregateTable string
	WorkersCount   int
}

Config is the configuration for the event storage.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig creates a new default config.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks if the config is valid to use.

type Storage

type Storage struct {
	// contains filtered or unexported fields
}

func New

func New(conn *sqlx.DB, cfg *Config, d xservice.Driver) (*Storage, error)

New creates a new event storage based on provided sqlx connection.

func (*Storage) Err

func (s *Storage) Err(err error) error

Err handles error message with given driver.

func (*Storage) GetEventStream

func (s *Storage) GetEventStream(ctx context.Context, aggId, aggType string) ([]*eventsource.Event, error)

GetEventStream gets the event stream for provided aggregate. Implements eventsource.Storage interface.

func (*Storage) GetSnapshot

func (s *Storage) GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*eventsource.Snapshot, error)

GetSnapshot gets the latest snapshot for given aggregate. Implements eventsource.Storage interface.

func (*Storage) GetStreamFromRevision

func (s *Storage) GetStreamFromRevision(ctx context.Context, aggId string, aggType string, from int64) ([]*eventsource.Event, error)

GetStreamFromRevision gets the event stream for given aggregate where the revision is subsequent from provided.

func (*Storage) NewCursor

func (s *Storage) NewCursor(ctx context.Context, aggType string, aggVersion int64) (eventsource.Cursor, error)

NewCursor creates a new cursor.

func (*Storage) SaveEvents

func (s *Storage) SaveEvents(ctx context.Context, es []*eventsource.Event) error

SaveEvents stores provided events in the database. Implements eventsource.Storage interface.

func (*Storage) SaveSnapshot

func (s *Storage) SaveSnapshot(ctx context.Context, snap *eventsource.Snapshot) error

SaveSnapshot stores the snapshot in the database. Implements eventsource.Storage interface.

func (*Storage) StreamEvents

func (s *Storage) StreamEvents(ctx context.Context, req *eventsource.StreamEventsRequest) (<-chan *eventsource.Event, error)

type Transaction

type Transaction struct {
	// contains filtered or unexported fields
}

Transaction is the

func (*Transaction) Commit

func (t *Transaction) Commit() error

Commit commits the transaction.

func (*Transaction) Conn

func (t *Transaction) Conn() (*sqlx.Tx, error)

func (*Transaction) Err

func (s *Transaction) Err(err error) error

Err handles error message with given driver.

func (*Transaction) GetEventStream

func (s *Transaction) GetEventStream(ctx context.Context, aggId, aggType string) ([]*eventsource.Event, error)

GetEventStream gets the event stream for provided aggregate. Implements eventsource.Storage interface.

func (*Transaction) GetSnapshot

func (s *Transaction) GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*eventsource.Snapshot, error)

GetSnapshot gets the latest snapshot for given aggregate. Implements eventsource.Storage interface.

func (*Transaction) GetStreamFromRevision

func (s *Transaction) GetStreamFromRevision(ctx context.Context, aggId string, aggType string, from int64) ([]*eventsource.Event, error)

GetStreamFromRevision gets the event stream for given aggregate where the revision is subsequent from provided.

func (*Transaction) NewCursor

func (s *Transaction) NewCursor(ctx context.Context, aggType string, aggVersion int64) (eventsource.Cursor, error)

NewCursor creates a new cursor.

func (*Transaction) SaveEvents

func (s *Transaction) SaveEvents(ctx context.Context, es []*eventsource.Event) error

SaveEvents stores provided events in the database. Implements eventsource.Storage interface.

func (*Transaction) SaveSnapshot

func (s *Transaction) SaveSnapshot(ctx context.Context, snap *eventsource.Snapshot) error

SaveSnapshot stores the snapshot in the database. Implements eventsource.Storage interface.

func (*Transaction) StreamEvents

func (s *Transaction) StreamEvents(ctx context.Context, req *eventsource.StreamEventsRequest) (<-chan *eventsource.Event, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL